foundationdb/transaction.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::metrics::{FdbCommand, TransactionMetrics};
22use crate::options;
23
24use crate::{FdbError, FdbResult, error};
25use foundationdb_macros::cfg_api_versions;
26
27use crate::error::{FdbBindingError, TransactionMetricsNotFound};
28
29use futures::{
30 Future, FutureExt, Stream, TryFutureExt, TryStreamExt, future, future::Either, stream,
31};
32
33#[cfg_api_versions(min = 610)]
34const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
35
36/// Special keyspace prefix for conflicting keys.
37const CONFLICTING_KEYS_PREFIX: &[u8] = b"\xff\xff/transaction/conflicting_keys/";
38// Matches C++ SystemData.cpp conflictingKeysRange end key.
39const CONFLICTING_KEYS_END: &[u8] = b"\xff\xff/transaction/conflicting_keys/\xff\xff";
40
41/// A key range that conflicted during a transaction commit.
42///
43/// Returned by [`Transaction::conflicting_keys`] after a commit conflict
44/// when [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
45/// is enabled.
46///
47/// The special keyspace encodes conflicting ranges using boundary markers:
48/// - Value `b"1"` marks the inclusive start of a conflicting range
49/// - Value `b"0"` marks the exclusive end of a conflicting range
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct ConflictingKeyRange {
52 begin: Vec<u8>,
53 end: Vec<u8>,
54}
55
56impl ConflictingKeyRange {
57 /// The inclusive begin of the conflicting key range.
58 pub fn begin(&self) -> &[u8] {
59 &self.begin
60 }
61
62 /// The exclusive end of the conflicting key range.
63 pub fn end(&self) -> &[u8] {
64 &self.end
65 }
66}
67
68impl fmt::Display for ConflictingKeyRange {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 write!(
71 f,
72 "{}..{}",
73 String::from_utf8_lossy(&self.begin),
74 String::from_utf8_lossy(&self.end),
75 )
76 }
77}
78
79/// A committed transaction.
80#[derive(Debug)]
81#[repr(transparent)]
82pub struct TransactionCommitted {
83 tr: Transaction,
84}
85
86impl TransactionCommitted {
87 /// Retrieves the database version number at which a given transaction was committed.
88 ///
89 /// Read-only transactions do not modify the database when committed and will have a committed
90 /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
91 /// current values may be optimized to a read-only transaction.
92 ///
93 /// Note that database versions are not necessarily unique to a given transaction and so cannot
94 /// be used to determine in what order two transactions completed. The only use for this
95 /// function is to manually enforce causal consistency when calling `set_read_version()` on
96 /// another subsequent transaction.
97 ///
98 /// Most applications will not call this function.
99 pub fn committed_version(&self) -> FdbResult<i64> {
100 let mut version: i64 = 0;
101 error::eval(unsafe {
102 fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
103 })?;
104 Ok(version)
105 }
106
107 /// Reset the transaction to its initial state.
108 ///
109 /// This will not affect previously committed data.
110 ///
111 /// This is similar to dropping the transaction and creating a new one.
112 pub fn reset(mut self) -> Transaction {
113 self.tr.reset();
114 self.tr
115 }
116}
117impl From<TransactionCommitted> for Transaction {
118 fn from(tc: TransactionCommitted) -> Transaction {
119 tc.reset()
120 }
121}
122
123/// A failed to commit transaction.
124pub struct TransactionCommitError {
125 tr: Transaction,
126 err: FdbError,
127}
128
129impl TransactionCommitError {
130 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
131 /// which of the error codes generated by other `Transaction` functions represent temporary
132 /// error conditions and which represent application errors that should be handled by the
133 /// application. It also implements an exponential backoff strategy to avoid swamping the
134 /// database cluster with excessive retries when there is a high level of conflict between
135 /// transactions.
136 ///
137 /// You should not call this method most of the times and use `Database::transact` which
138 /// implements a retry loop strategy for you.
139 pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
140 FdbFuture::<()>::new(unsafe {
141 fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
142 })
143 .map_ok(|()| self.tr)
144 }
145
146 /// Reads the conflicting key ranges that caused this commit failure.
147 ///
148 /// Only returns meaningful results if
149 /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
150 /// was set on the transaction **and** the error is `not_committed` (code 1020).
151 ///
152 /// Must be called **before** [`on_error`](Self::on_error) which resets the transaction.
153 ///
154 /// # Errors
155 ///
156 /// Returns an `FdbError` if the special keyspace read fails.
157 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
158 pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
159 self.tr.conflicting_keys().await
160 }
161
162 /// Reset the transaction to its initial state.
163 ///
164 /// This is similar to dropping the transaction and creating a new one.
165 pub fn reset(mut self) -> Transaction {
166 self.tr.reset();
167 self.tr
168 }
169}
170
171impl Deref for TransactionCommitError {
172 type Target = FdbError;
173 fn deref(&self) -> &FdbError {
174 &self.err
175 }
176}
177
178impl From<TransactionCommitError> for FdbError {
179 fn from(tce: TransactionCommitError) -> FdbError {
180 tce.err
181 }
182}
183
184impl fmt::Debug for TransactionCommitError {
185 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
186 write!(f, "TransactionCommitError({})", self.err)
187 }
188}
189
190impl fmt::Display for TransactionCommitError {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 self.err.fmt(f)
193 }
194}
195
196impl std::error::Error for TransactionCommitError {}
197
198/// The result of `Transaction::Commit`
199type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
200
201/// A cancelled transaction
202#[derive(Debug)]
203#[repr(transparent)]
204pub struct TransactionCancelled {
205 tr: Transaction,
206}
207impl TransactionCancelled {
208 /// Reset the transaction to its initial state.
209 ///
210 /// This is similar to dropping the transaction and creating a new one.
211 pub fn reset(mut self) -> Transaction {
212 self.tr.reset();
213 self.tr
214 }
215}
216impl From<TransactionCancelled> for Transaction {
217 fn from(tc: TransactionCancelled) -> Transaction {
218 tc.reset()
219 }
220}
221
222/// In FoundationDB, a transaction is a mutable snapshot of a database.
223///
224/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
225///
226/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
227///
228/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
229///
230/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
231#[derive(Debug)]
232pub struct Transaction {
233 // Order of fields should not be changed, because Rust drops field top-to-bottom, and
234 // transaction should be dropped before cluster.
235 inner: NonNull<fdb_sys::FDBTransaction>,
236 metrics: Option<TransactionMetrics>,
237}
238unsafe impl Send for Transaction {}
239unsafe impl Sync for Transaction {}
240
241/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
242#[inline]
243fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
244 if v { 1 } else { 0 }
245}
246#[inline]
247fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
248 assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
249 len as i32
250}
251#[inline]
252fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
253 if iteration > i32::MAX as usize {
254 0 // this will cause client_invalid_operation
255 } else {
256 iteration as i32
257 }
258}
259#[inline]
260fn fdb_limit(v: usize) -> std::os::raw::c_int {
261 if v > i32::MAX as usize {
262 i32::MAX
263 } else {
264 v as i32
265 }
266}
267
268/// `RangeOption` represents a query parameters for range scan query.
269///
270/// You can construct `RangeOption` easily:
271///
272/// ```
273/// use foundationdb::RangeOption;
274///
275/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
276/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
277/// let opt = RangeOption {
278/// limit: Some(10),
279/// ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
280/// };
281/// ```
282#[derive(Debug, Clone)]
283pub struct RangeOption<'a> {
284 /// The beginning of the range.
285 pub begin: KeySelector<'a>,
286 /// The end of the range.
287 pub end: KeySelector<'a>,
288 /// If non-zero, indicates the maximum number of key-value pairs to return.
289 pub limit: Option<usize>,
290 /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
291 /// return for each item.
292 pub target_bytes: usize,
293 /// One of the options::StreamingMode values indicating how the caller would like the data in
294 /// the range returned.
295 pub mode: options::StreamingMode,
296 /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
297 /// the end of the range.
298 pub reverse: bool,
299 #[doc(hidden)]
300 pub __non_exhaustive: std::marker::PhantomData<()>,
301}
302
303impl RangeOption<'_> {
304 /// Reverses the range direction.
305 pub fn rev(mut self) -> Self {
306 self.reverse = !self.reverse;
307 self
308 }
309
310 pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
311 if !kvs.more() {
312 return None;
313 }
314
315 let last = kvs.last()?;
316 let last_key = last.key();
317
318 if let Some(limit) = self.limit.as_mut() {
319 *limit = limit.saturating_sub(kvs.len());
320 if *limit == 0 {
321 return None;
322 }
323 }
324
325 if self.reverse {
326 self.end.make_first_greater_or_equal(last_key);
327 } else {
328 self.begin.make_first_greater_than(last_key);
329 }
330 Some(self)
331 }
332
333 #[cfg_api_versions(min = 710)]
334 pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
335 if !kvs.more() {
336 return None;
337 }
338
339 let last = kvs.last()?;
340 let last_key = last.parent_key();
341
342 if let Some(limit) = self.limit.as_mut() {
343 *limit = limit.saturating_sub(kvs.len());
344 if *limit == 0 {
345 return None;
346 }
347 }
348
349 if self.reverse {
350 self.end.make_first_greater_or_equal(last_key);
351 } else {
352 self.begin.make_first_greater_than(last_key);
353 }
354 Some(self)
355 }
356}
357
358impl Default for RangeOption<'_> {
359 fn default() -> Self {
360 Self {
361 begin: KeySelector::first_greater_or_equal([].as_ref()),
362 end: KeySelector::first_greater_or_equal([].as_ref()),
363 limit: None,
364 target_bytes: 0,
365 mode: options::StreamingMode::Iterator,
366 reverse: false,
367 __non_exhaustive: std::marker::PhantomData,
368 }
369 }
370}
371
372impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
373 fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
374 Self {
375 begin,
376 end,
377 ..Self::default()
378 }
379 }
380}
381impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
382 fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
383 Self {
384 begin: KeySelector::first_greater_or_equal(begin),
385 end: KeySelector::first_greater_or_equal(end),
386 ..Self::default()
387 }
388 }
389}
390impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
391 fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
392 Self {
393 begin: KeySelector::first_greater_or_equal(begin),
394 end: KeySelector::first_greater_or_equal(end),
395 ..Self::default()
396 }
397 }
398}
399impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
400 fn from(range: Range<KeySelector<'a>>) -> Self {
401 RangeOption::from((range.start, range.end))
402 }
403}
404
405impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
406 fn from(range: Range<&'a [u8]>) -> Self {
407 RangeOption::from((range.start, range.end))
408 }
409}
410
411impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
412 fn from(range: Range<Vec<u8>>) -> Self {
413 RangeOption::from((range.start, range.end))
414 }
415}
416
417impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
418 fn from(range: RangeInclusive<&'a [u8]>) -> Self {
419 let (start, end) = range.into_inner();
420 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
421 }
422}
423
424impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
425 fn from(range: RangeInclusive<Vec<u8>>) -> Self {
426 let (start, end) = range.into_inner();
427 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
428 }
429}
430
431impl Transaction {
432 pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
433 Self {
434 inner,
435 metrics: None,
436 }
437 }
438
439 pub fn new_instrumented(
440 inner: NonNull<fdb_sys::FDBTransaction>,
441 metrics: TransactionMetrics,
442 ) -> Self {
443 Self {
444 inner,
445 metrics: Some(metrics),
446 }
447 }
448
449 /// Called to set an option on an FDBTransaction.
450 pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
451 unsafe { opt.apply(self.inner.as_ptr()) }
452 }
453
454 /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
455 /// where the code and data will be provided as raw, in order to avoid deserializing to an option
456 /// and serializing it back to code and data.
457 /// In general, you should use `set_option`.
458 pub fn set_raw_option(
459 &self,
460 code: fdb_sys::FDBTransactionOption,
461 data: Option<Vec<u8>>,
462 ) -> FdbResult<()> {
463 let (data_ptr, size) = data
464 .as_ref()
465 .map(|data| {
466 (
467 data.as_ptr(),
468 i32::try_from(data.len()).expect("len to fit in i32"),
469 )
470 })
471 .unwrap_or_else(|| (std::ptr::null(), 0));
472 let err = unsafe {
473 fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
474 };
475 if err != 0 {
476 Err(FdbError::from_code(err))
477 } else {
478 Ok(())
479 }
480 }
481
482 /// Modify the database snapshot represented by transaction to change the given
483 /// key to have the given value.
484 ///
485 /// If the given key was not previously present in the database it is inserted.
486 /// The modification affects the actual database only if transaction is later
487 /// committed with `Transaction::commit`.
488 ///
489 /// # Arguments
490 ///
491 /// * `key` - the name of the key to be inserted into the database.
492 /// * `value` - the value to be inserted into the database
493 #[cfg_attr(
494 feature = "trace",
495 tracing::instrument(level = "debug", skip(self, key, value))
496 )]
497 pub fn set(&self, key: &[u8], value: &[u8]) {
498 unsafe {
499 fdb_sys::fdb_transaction_set(
500 self.inner.as_ptr(),
501 key.as_ptr(),
502 fdb_len(key.len(), "key"),
503 value.as_ptr(),
504 fdb_len(value.len(), "value"),
505 )
506 }
507
508 if let Some(metrics) = &self.metrics {
509 metrics.report_metrics(FdbCommand::Set((key.len() + value.len()) as u64));
510 }
511 }
512
513 /// Modify the database snapshot represented by transaction to remove the given key from the
514 /// database.
515 ///
516 /// If the key was not previously present in the database, there is no effect. The modification
517 /// affects the actual database only if transaction is later committed with
518 /// `Transaction::commit`.
519 ///
520 /// # Arguments
521 ///
522 /// * `key` - the name of the key to be removed from the database.
523 #[cfg_attr(
524 feature = "trace",
525 tracing::instrument(level = "debug", skip(self, key))
526 )]
527 pub fn clear(&self, key: &[u8]) {
528 unsafe {
529 fdb_sys::fdb_transaction_clear(
530 self.inner.as_ptr(),
531 key.as_ptr(),
532 fdb_len(key.len(), "key"),
533 )
534 }
535
536 if let Some(metrics) = &self.metrics {
537 metrics.report_metrics(FdbCommand::Clear);
538 }
539 }
540
541 /// Reads a value from the database snapshot represented by transaction.
542 ///
543 /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
544 ///
545 /// # Arguments
546 ///
547 /// * `key` - the name of the key to be looked up in the database
548 /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
549 #[cfg_attr(
550 feature = "trace",
551 tracing::instrument(level = "debug", skip(self, key))
552 )]
553 pub fn get(
554 &self,
555 key: &[u8],
556 snapshot: bool,
557 ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin + use<> {
558 let metrics = self.metrics.clone();
559 let lenght_key = key.len();
560
561 FdbFuture::<Option<FdbSlice>>::new(unsafe {
562 fdb_sys::fdb_transaction_get(
563 self.inner.as_ptr(),
564 key.as_ptr(),
565 fdb_len(key.len(), "key"),
566 fdb_bool(snapshot),
567 )
568 })
569 .map(move |result| {
570 if let Ok(value) = &result {
571 if let Some(metrics) = metrics.as_ref() {
572 let (bytes_count, kv_fetched) = if let Some(values) = value {
573 ((lenght_key + values.len()) as u64, 1)
574 } else {
575 (lenght_key as u64, 0)
576 };
577 metrics.report_metrics(FdbCommand::Get(bytes_count, kv_fetched));
578 }
579 }
580 result
581 })
582 }
583
584 /// Modify the database snapshot represented by transaction to perform the operation indicated
585 /// by operationType with operand param to the value stored by the given key.
586 ///
587 /// An atomic operation is a single database command that carries out several logical steps:
588 /// reading the value of a key, performing a transformation on that value, and writing the
589 /// result. Different atomic operations perform different transformations. Like other database
590 /// operations, an atomic operation is used within a transaction; however, its use within a
591 /// transaction will not cause the transaction to conflict.
592 ///
593 /// Atomic operations do not expose the current value of the key to the client but simply send
594 /// the database the transformation to apply. In regard to conflict checking, an atomic
595 /// operation is equivalent to a write without a read. It can only cause other transactions
596 /// performing reads of the key to conflict.
597 ///
598 /// By combining these logical steps into a single, read-free operation, FoundationDB can
599 /// guarantee that the transaction will not conflict due to the operation. This makes atomic
600 /// operations ideal for operating on keys that are frequently modified. A common example is
601 /// the use of a key-value pair as a counter.
602 ///
603 /// # Warning
604 ///
605 /// If a transaction uses both an atomic operation and a strictly serializable read on the same
606 /// key, the benefits of using the atomic operation (for both conflict checking and performance)
607 /// are lost.
608 #[cfg_attr(
609 feature = "trace",
610 tracing::instrument(level = "debug", skip(self, key, param))
611 )]
612 pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
613 unsafe {
614 fdb_sys::fdb_transaction_atomic_op(
615 self.inner.as_ptr(),
616 key.as_ptr(),
617 fdb_len(key.len(), "key"),
618 param.as_ptr(),
619 fdb_len(param.len(), "param"),
620 op_type.code(),
621 )
622 }
623
624 if let Some(metrics) = &self.metrics {
625 metrics.report_metrics(FdbCommand::Atomic);
626 }
627 }
628
629 /// Resolves a key selector against the keys in the database snapshot represented by
630 /// transaction.
631 ///
632 /// Returns an FDBFuture which will be set to the key in the database matching the key
633 /// selector.
634 ///
635 /// # Arguments
636 ///
637 /// * `selector`: the key selector
638 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
639 #[cfg_attr(
640 feature = "trace",
641 tracing::instrument(level = "debug", skip(self, selector, snapshot))
642 )]
643 pub fn get_key(
644 &self,
645 selector: &KeySelector,
646 snapshot: bool,
647 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin + use<> {
648 let key = selector.key();
649 FdbFuture::new(unsafe {
650 fdb_sys::fdb_transaction_get_key(
651 self.inner.as_ptr(),
652 key.as_ptr(),
653 fdb_len(key.len(), "key"),
654 fdb_bool(selector.or_equal()),
655 selector.offset(),
656 fdb_bool(snapshot),
657 )
658 })
659 }
660
661 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
662 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
663 /// equal to the key resolved by the begin key selector and lexicographically less than the key
664 /// resolved by the end key selector.
665 ///
666 /// Returns a stream of KeyValue slices.
667 ///
668 /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
669 /// use.
670 ///
671 /// # Arguments
672 ///
673 /// * `opt`: the range, limit, target_bytes and mode
674 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
675 #[cfg_attr(
676 feature = "trace",
677 tracing::instrument(level = "debug", skip(self, opt, snapshot))
678 )]
679 pub fn get_ranges<'a>(
680 &'a self,
681 opt: RangeOption<'a>,
682 snapshot: bool,
683 ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
684 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
685 if let Some(opt) = maybe_opt {
686 Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
687 move |maybe_values| {
688 let next_opt = match &maybe_values {
689 Ok(values) => opt.next_range(values),
690 Err(..) => None,
691 };
692 Some((maybe_values, (iteration + 1, next_opt)))
693 },
694 ))
695 } else {
696 Either::Right(future::ready(None))
697 }
698 })
699 }
700
701 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
702 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
703 /// equal to the key resolved by the begin key selector and lexicographically less than the key
704 /// resolved by the end key selector.
705 ///
706 /// Returns a stream of KeyValue.
707 ///
708 /// # Arguments
709 ///
710 /// * `opt`: the range, limit, target_bytes and mode
711 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
712 #[cfg_attr(
713 feature = "trace",
714 tracing::instrument(level = "debug", skip(self, opt, snapshot))
715 )]
716 pub fn get_ranges_keyvalues<'a>(
717 &'a self,
718 opt: RangeOption<'a>,
719 snapshot: bool,
720 ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
721 self.get_ranges(opt, snapshot)
722 .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
723 .try_flatten()
724 }
725
726 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
727 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
728 /// equal to the key resolved by the begin key selector and lexicographically less than the key
729 /// resolved by the end key selector.
730 ///
731 /// <div class="warning">
732 /// This method returns <strong>only a single batch</strong> of results, not necessarily all
733 /// key-value pairs in the range. It is a low-level primitive for manual pagination. Most
734 /// callers should use
735 /// <a href="struct.Transaction.html#method.get_ranges_keyvalues"><code>get_ranges_keyvalues</code></a>,
736 /// which automatically pages through the full range and yields every key-value pair as a stream.
737 /// </div>
738 ///
739 /// # Arguments
740 ///
741 /// * `opt`: the range, limit, target_bytes and mode
742 /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
743 /// by 1 for each successive call while reading this range. In all other cases it is ignored.
744 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
745 #[cfg_attr(
746 feature = "trace",
747 tracing::instrument(level = "debug", skip(self, opt, snapshot))
748 )]
749 pub fn get_range(
750 &self,
751 opt: &RangeOption,
752 iteration: usize,
753 snapshot: bool,
754 ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin + use<> {
755 let begin = &opt.begin;
756 let end = &opt.end;
757 let key_begin = begin.key();
758 let key_end = end.key();
759
760 let metrics = self.metrics.clone();
761
762 FdbFuture::<FdbValues>::new(unsafe {
763 fdb_sys::fdb_transaction_get_range(
764 self.inner.as_ptr(),
765 key_begin.as_ptr(),
766 fdb_len(key_begin.len(), "key_begin"),
767 fdb_bool(begin.or_equal()),
768 begin.offset(),
769 key_end.as_ptr(),
770 fdb_len(key_end.len(), "key_end"),
771 fdb_bool(end.or_equal()),
772 end.offset(),
773 fdb_limit(opt.limit.unwrap_or(0)),
774 fdb_limit(opt.target_bytes),
775 opt.mode.code(),
776 fdb_iteration(iteration),
777 fdb_bool(snapshot),
778 fdb_bool(opt.reverse),
779 )
780 })
781 .map(move |result| {
782 if let (Ok(values), Some(metrics)) = (&result, metrics) {
783 let kv_fetched = values.len();
784 let mut bytes_count = 0;
785
786 for key_value in values.as_ref() {
787 let key_len = key_value.key().len();
788 let value_len = key_value.value().len();
789
790 bytes_count += (key_len + value_len) as u64
791 }
792
793 metrics.report_metrics(FdbCommand::GetRange(bytes_count, kv_fetched as u64));
794 };
795
796 result
797 })
798 }
799
800 /// Mapped Range is an experimental feature introduced in FDB 7.1.
801 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
802 /// In such a case, querying records by scanning an index in relational databases can be
803 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
804 ///
805 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
806 /// this can happen in one request without additional back and forth. Considering the overhead
807 /// of each request, this saves time and resources on serialization, deserialization, and network.
808 ///
809 /// A mapped request will:
810 ///
811 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
812 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
813 /// * Put all results in a nested structure and return them.
814 ///
815 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
816 /// using snapshot isolation AND disabling read-your-writes.
817 ///
818 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
819 ///
820 /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
821 #[cfg_api_versions(min = 710)]
822 #[cfg_attr(
823 feature = "trace",
824 tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
825 )]
826 pub fn get_mapped_range(
827 &self,
828 opt: &RangeOption,
829 mapper: &[u8],
830 iteration: usize,
831 snapshot: bool,
832 ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + use<> {
833 let begin = &opt.begin;
834 let end = &opt.end;
835 let key_begin = begin.key();
836 let key_end = end.key();
837
838 FdbFuture::new(unsafe {
839 fdb_sys::fdb_transaction_get_mapped_range(
840 self.inner.as_ptr(),
841 key_begin.as_ptr(),
842 fdb_len(key_begin.len(), "key_begin"),
843 fdb_bool(begin.or_equal()),
844 begin.offset(),
845 key_end.as_ptr(),
846 fdb_len(key_end.len(), "key_end"),
847 fdb_bool(end.or_equal()),
848 end.offset(),
849 mapper.as_ptr(),
850 fdb_len(mapper.len(), "mapper_length"),
851 fdb_limit(opt.limit.unwrap_or(0)),
852 fdb_limit(opt.target_bytes),
853 opt.mode.code(),
854 fdb_iteration(iteration),
855 fdb_bool(snapshot),
856 fdb_bool(opt.reverse),
857 )
858 })
859 }
860
861 /// Mapped Range is an experimental feature introduced in FDB 7.1.
862 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
863 /// In such a case, querying records by scanning an index in relational databases can be
864 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
865 ///
866 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
867 /// this can happen in one request without additional back and forth. Considering the overhead
868 /// of each request, this saves time and resources on serialization, deserialization, and network.
869 ///
870 /// A mapped request will:
871 ///
872 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
873 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
874 /// * Put all results in a nested structure and return them.
875 ///
876 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
877 /// using snapshot isolation AND disabling read-your-writes.
878 ///
879 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
880 #[cfg_api_versions(min = 710)]
881 #[cfg_attr(
882 feature = "trace",
883 tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
884 )]
885 pub fn get_mapped_ranges<'a>(
886 &'a self,
887 opt: RangeOption<'a>,
888 mapper: &'a [u8],
889 snapshot: bool,
890 ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
891 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
892 if let Some(opt) = maybe_opt {
893 Either::Left(
894 self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
895 .map(move |maybe_values| {
896 let next_opt = match &maybe_values {
897 Ok(values) => opt.next_mapped_range(values),
898 Err(..) => None,
899 };
900 Some((maybe_values, (iteration + 1, next_opt)))
901 }),
902 )
903 } else {
904 Either::Right(future::ready(None))
905 }
906 })
907 }
908
909 /// Modify the database snapshot represented by transaction to remove all keys (if any) which
910 /// are lexicographically greater than or equal to the given begin key and lexicographically
911 /// less than the given end_key.
912 ///
913 /// The modification affects the actual database only if transaction is later committed with
914 /// `Transaction::commit`.
915 #[cfg_attr(
916 feature = "trace",
917 tracing::instrument(level = "debug", skip(self, begin, end))
918 )]
919 pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
920 unsafe {
921 fdb_sys::fdb_transaction_clear_range(
922 self.inner.as_ptr(),
923 begin.as_ptr(),
924 fdb_len(begin.len(), "begin"),
925 end.as_ptr(),
926 fdb_len(end.len(), "end"),
927 )
928 }
929
930 if let Some(metrics) = &self.metrics {
931 metrics.report_metrics(FdbCommand::ClearRange);
932 }
933 }
934
935 /// Get the estimated byte size of the key range based on the byte sample collected by FDB
936 #[cfg_api_versions(min = 630)]
937 pub fn get_estimated_range_size_bytes(
938 &self,
939 begin: &[u8],
940 end: &[u8],
941 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
942 FdbFuture::<i64>::new(unsafe {
943 fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
944 self.inner.as_ptr(),
945 begin.as_ptr(),
946 fdb_len(begin.len(), "begin"),
947 end.as_ptr(),
948 fdb_len(end.len(), "end"),
949 )
950 })
951 }
952
953 /// Attempts to commit the sets and clears previously applied to the database snapshot
954 /// represented by transaction to the actual database.
955 ///
956 /// The commit may or may not succeed – in particular, if a conflicting transaction previously
957 /// committed, then the commit must fail in order to preserve transactional isolation. If the
958 /// commit does succeed, the transaction is durably committed to the database and all
959 /// subsequently started transactions will observe its effects.
960 ///
961 /// It is not necessary to commit a read-only transaction – you can simply drop it.
962 ///
963 /// Callers will usually want to retry a transaction if the commit or a another method on the
964 /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
965 ///
966 /// As with other client/server databases, in some failure scenarios a client may be unable to
967 /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
968 /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
969 /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
970 /// could execute the transaction twice. In these cases, you must consider the idempotence of
971 /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
972 ///
973 /// Normally, commit will wait for outstanding reads to return. However, if those reads were
974 /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
975 /// any outstanding reads will immediately return errors.
976 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
977 pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
978 FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
979 move |r| match r {
980 Ok(()) => Ok(TransactionCommitted { tr: self }),
981 Err(err) => Err(TransactionCommitError { tr: self, err }),
982 },
983 )
984 }
985
986 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
987 /// which of the error codes generated by other `Transaction` functions represent temporary
988 /// error conditions and which represent application errors that should be handled by the
989 /// application. It also implements an exponential backoff strategy to avoid swamping the
990 /// database cluster with excessive retries when there is a high level of conflict between
991 /// transactions.
992 ///
993 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
994 /// transaction has already been reset.
995 ///
996 /// You should not call this method most of the times and use `Database::transact` which
997 /// implements a retry loop strategy for you.
998 pub fn on_error(
999 self,
1000 err: FdbError,
1001 ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
1002 FdbFuture::<()>::new(unsafe {
1003 fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
1004 })
1005 .map_ok(|()| self)
1006 }
1007
1008 /// Cancels the transaction. All pending or future uses of the transaction will return a
1009 /// transaction_cancelled error. The transaction can be used again after it is reset.
1010 pub fn cancel(self) -> TransactionCancelled {
1011 unsafe {
1012 fdb_sys::fdb_transaction_cancel(self.inner.as_ptr());
1013 }
1014 TransactionCancelled { tr: self }
1015 }
1016
1017 /// Sets a custom metric for the transaction with the specified name, value, and labels.
1018 ///
1019 /// Custom metrics allow you to track application-specific metrics during transaction execution.
1020 /// These metrics are collected alongside the standard FoundationDB metrics and can be used
1021 /// for monitoring, debugging, and performance analysis.
1022 ///
1023 /// # Arguments
1024 /// * `name` - The name of the metric (e.g., "query_time", "cache_hits")
1025 /// * `value` - The value to set for the metric
1026 /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1027 /// (e.g., `[("operation", "read"), ("region", "us-west")]`)
1028 ///
1029 /// # Returns
1030 /// * `Ok(())` if the metric was set successfully
1031 /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1032 ///
1033 /// # Example
1034 /// ```
1035 /// # use foundationdb::*;
1036 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1037 /// # let db = Database::default()?;
1038 /// let metrics = TransactionMetrics::new();
1039 /// let txn = db.create_instrumented_trx(metrics)?;
1040 ///
1041 /// // Set a custom metric with labels
1042 /// txn.set_custom_metric("query_processing_time", 42, &[("query_type", "select"), ("table", "users")])?;
1043 /// # Ok(())
1044 /// # }
1045 /// ```
1046 ///
1047 /// # Note
1048 /// This method only works on transactions created with `create_instrumented_trx()` or within
1049 /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1050 /// return `Err(TransactionMetricsNotFound)`.
1051 pub fn set_custom_metric(
1052 &self,
1053 name: &str,
1054 value: u64,
1055 labels: &[(&str, &str)],
1056 ) -> Result<(), TransactionMetricsNotFound> {
1057 if let Some(metrics) = &self.metrics {
1058 metrics.set_custom(name, value, labels);
1059 Ok(())
1060 } else {
1061 Err(TransactionMetricsNotFound)
1062 }
1063 }
1064
1065 /// Increments a custom metric for the transaction by the specified amount.
1066 ///
1067 /// This is useful for counting events or accumulating values during transaction execution.
1068 /// If the metric doesn't exist yet, it will be created with the specified amount.
1069 /// If it already exists with the same labels, its value will be incremented.
1070 ///
1071 /// # Arguments
1072 /// * `name` - The name of the metric to increment (e.g., "requests", "bytes_processed")
1073 /// * `amount` - The amount to increment the metric by
1074 /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1075 /// (e.g., `[("status", "success"), ("endpoint", "api/v1/users")]`)
1076 ///
1077 /// # Returns
1078 /// * `Ok(())` if the metric was incremented successfully
1079 /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1080 ///
1081 /// # Example
1082 /// ```
1083 /// # use foundationdb::*;
1084 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1085 /// # let db = Database::default()?;
1086 /// let metrics = TransactionMetrics::new();
1087 /// let txn = db.create_instrumented_trx(metrics)?;
1088 ///
1089 /// // Increment a counter each time a specific operation occurs
1090 /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1091 ///
1092 /// // Later in the transaction, increment it again
1093 /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1094 ///
1095 /// // The final value will be 2
1096 /// # Ok(())
1097 /// # }
1098 /// ```
1099 ///
1100 /// # Note
1101 /// This method only works on transactions created with `create_instrumented_trx()` or within
1102 /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1103 /// return `Err(TransactionMetricsNotFound)`.
1104 pub fn increment_custom_metric(
1105 &self,
1106 name: &str,
1107 amount: u64,
1108 labels: &[(&str, &str)],
1109 ) -> Result<(), TransactionMetricsNotFound> {
1110 if let Some(metrics) = &self.metrics {
1111 metrics.increment_custom(name, amount, labels);
1112 Ok(())
1113 } else {
1114 Err(TransactionMetricsNotFound)
1115 }
1116 }
1117
1118 /// Returns a list of public network addresses as strings, one for each of the storage servers
1119 /// responsible for storing key_name and its associated value.
1120 pub fn get_addresses_for_key(
1121 &self,
1122 key: &[u8],
1123 ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin + use<> {
1124 FdbFuture::new(unsafe {
1125 fdb_sys::fdb_transaction_get_addresses_for_key(
1126 self.inner.as_ptr(),
1127 key.as_ptr(),
1128 fdb_len(key.len(), "key"),
1129 )
1130 })
1131 }
1132
1133 /// A watch's behavior is relative to the transaction that created it. A watch will report a
1134 /// change in relation to the key’s value as readable by that transaction. The initial value
1135 /// used for comparison is either that of the transaction’s read version or the value as
1136 /// modified by the transaction itself prior to the creation of the watch. If the value changes
1137 /// and then changes back to its initial value, the watch might not report the change.
1138 ///
1139 /// Until the transaction that created it has been committed, a watch will not report changes
1140 /// made by other transactions. In contrast, a watch will immediately report changes made by
1141 /// the transaction itself. Watches cannot be created if the transaction has set the
1142 /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
1143 /// watches_disabled error.
1144 ///
1145 /// If the transaction used to create a watch encounters an error during commit, then the watch
1146 /// will be set with that error. A transaction whose commit result is unknown will set all of
1147 /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
1148 /// destroyed, then any watches it created will be set with the transaction_cancelled error.
1149 ///
1150 /// Returns an future representing an empty value that will be set once the watch has
1151 /// detected a change to the value at the specified key.
1152 ///
1153 /// By default, each database connection can have no more than 10,000 watches that have not yet
1154 /// reported a change. When this number is exceeded, an attempt to create a watch will return a
1155 /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
1156 /// Because a watch outlives the transaction that creates it, any watch that is no longer
1157 /// needed should be cancelled by dropping its future.
1158 pub fn watch(
1159 &self,
1160 key: &[u8],
1161 ) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin + use<> {
1162 FdbFuture::new(unsafe {
1163 fdb_sys::fdb_transaction_watch(
1164 self.inner.as_ptr(),
1165 key.as_ptr(),
1166 fdb_len(key.len(), "key"),
1167 )
1168 })
1169 }
1170
1171 /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
1172 /// returned future, which is the summation of the estimated size of mutations, read conflict
1173 /// ranges, and write conflict ranges.
1174 ///
1175 /// This can be called multiple times before the transaction is committed.
1176 #[cfg_api_versions(min = 620)]
1177 pub fn get_approximate_size(
1178 &self,
1179 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
1180 FdbFuture::new(unsafe {
1181 fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
1182 })
1183 }
1184
1185 /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
1186 /// Note: the returned split points contain the start key and end key of the given range.
1187 #[cfg_api_versions(min = 700)]
1188 pub fn get_range_split_points(
1189 &self,
1190 begin: &[u8],
1191 end: &[u8],
1192 chunk_size: i64,
1193 ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin + use<> {
1194 FdbFuture::<FdbKeys>::new(unsafe {
1195 fdb_sys::fdb_transaction_get_range_split_points(
1196 self.inner.as_ptr(),
1197 begin.as_ptr(),
1198 fdb_len(begin.len(), "begin"),
1199 end.as_ptr(),
1200 fdb_len(end.len(), "end"),
1201 chunk_size,
1202 )
1203 })
1204 }
1205
1206 /// Returns an FDBFuture which will be set to the versionstamp which was used by any
1207 /// versionstamp operations in this transaction.
1208 ///
1209 /// The future will be ready only after the successful completion of a call to `commit()` on
1210 /// this Transaction. Read-only transactions do not modify the database when committed and will
1211 /// result in the future completing with an error. Keep in mind that a transaction which reads
1212 /// keys and then sets them to their current values may be optimized to a read-only transaction.
1213 ///
1214 /// Most applications will not call this function.
1215 pub fn get_versionstamp(
1216 &self,
1217 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin + use<> {
1218 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
1219 }
1220
1221 /// The transaction obtains a snapshot read version automatically at the time of the first call
1222 /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
1223 /// compromised by transaction options) is guaranteed to represent all transactions which were
1224 /// reported committed before that call.
1225 pub fn get_read_version(
1226 &self,
1227 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
1228 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
1229 }
1230
1231 /// Sets the snapshot read version used by a transaction.
1232 ///
1233 /// This is not needed in simple cases.
1234 /// If the given version is too old, subsequent reads will fail with error_code_past_version;
1235 /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
1236 /// error_code_future_version. If any of get_*() have been called on this transaction already,
1237 /// the result is undefined.
1238 pub fn set_read_version(&self, version: i64) {
1239 unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
1240 }
1241
1242 /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
1243 /// The value of this key is sent to clients along with the read version from the proxy,
1244 /// so a client can read its value without communicating with a storage server.
1245 /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
1246 #[cfg_api_versions(min = 610)]
1247 pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
1248 match self.get(METADATA_VERSION_KEY, snapshot).await {
1249 Ok(Some(fdb_slice)) => {
1250 let value = fdb_slice.deref();
1251 // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
1252 // can we assume that it will always be the correct size?
1253 if value.len() < 8 {
1254 return Ok(None);
1255 }
1256
1257 // The 80-bits versionstamps are 10 bytes longs, and are composed of:
1258 // * 8 bytes (Transaction Version)
1259 // * followed by 2 bytes (Transaction Batch Order)
1260 // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
1261 let mut arr = [0u8; 8];
1262 arr.copy_from_slice(&value[0..8]);
1263 let transaction_version: i64 = i64::from_be_bytes(arr);
1264
1265 Ok(Some(transaction_version))
1266 }
1267 Ok(None) => Ok(None),
1268
1269 Err(err) => Err(err),
1270 }
1271 }
1272
1273 #[cfg_api_versions(min = 610)]
1274 pub fn update_metadata_version(&self) {
1275 // The param is transformed by removing the final four bytes from ``param`` and reading
1276 // those as a little-Endian 32-bit integer to get a position ``pos``.
1277 // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
1278 // versionstamp of the transaction used. The first byte of the parameter is position 0.
1279 // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
1280 let param = vec![0u8; 14];
1281 self.atomic_op(
1282 METADATA_VERSION_KEY,
1283 param.as_slice(),
1284 options::MutationType::SetVersionstampedValue,
1285 )
1286 }
1287
1288 /// Reset transaction to its initial state.
1289 ///
1290 /// In order to protect against a race condition with cancel(), this call require a mutable
1291 /// access to the transaction.
1292 ///
1293 /// This is similar to dropping the transaction and creating a new one.
1294 ///
1295 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1296 /// transaction has already been reset.
1297 pub fn reset(&mut self) {
1298 unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1299 }
1300
1301 /// Reads the conflicting key ranges from the special keyspace after a commit conflict.
1302 ///
1303 /// This method reads from `\xff\xff/transaction/conflicting_keys/` and parses the
1304 /// boundary encoding where `b"1"` marks range starts and `b"0"` marks range ends.
1305 ///
1306 /// The special keyspace read is resolved client-side — no network round-trip to the
1307 /// cluster. The future still goes through the FDB network thread event loop, but the
1308 /// data comes from an in-memory map populated during the commit response. Returns
1309 /// an empty `Vec` if
1310 /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
1311 /// was not set.
1312 ///
1313 /// # Errors
1314 ///
1315 /// Returns an `FdbError` if the special keyspace read fails.
1316 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1317 pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
1318 let opt = RangeOption::from((CONFLICTING_KEYS_PREFIX, CONFLICTING_KEYS_END));
1319 let range_result = self.get_range(&opt, 1, false).await?;
1320
1321 let prefix_len = CONFLICTING_KEYS_PREFIX.len();
1322 let mut ranges = Vec::new();
1323 let mut current_begin: Option<Vec<u8>> = None;
1324
1325 for kv in range_result.iter() {
1326 let raw_key = kv.key();
1327 let actual_key = if raw_key.len() > prefix_len {
1328 raw_key[prefix_len..].to_vec()
1329 } else {
1330 Vec::new()
1331 };
1332
1333 match kv.value() {
1334 b"1" => {
1335 current_begin = Some(actual_key);
1336 }
1337 b"0" => {
1338 if let Some(begin) = current_begin.take() {
1339 ranges.push(ConflictingKeyRange {
1340 begin,
1341 end: actual_key,
1342 });
1343 }
1344 }
1345 _ => {}
1346 }
1347 }
1348
1349 debug_assert!(
1350 current_begin.is_none(),
1351 "unpaired '1' marker in conflicting keys response"
1352 );
1353
1354 Ok(ranges)
1355 }
1356
1357 /// Adds a conflict range to a transaction without performing the associated read or write.
1358 ///
1359 /// # Note
1360 ///
1361 /// Most applications will use the serializable isolation that transactions provide by default
1362 /// and will not need to manipulate conflict ranges.
1363 pub fn add_conflict_range(
1364 &self,
1365 begin: &[u8],
1366 end: &[u8],
1367 ty: options::ConflictRangeType,
1368 ) -> FdbResult<()> {
1369 error::eval(unsafe {
1370 fdb_sys::fdb_transaction_add_conflict_range(
1371 self.inner.as_ptr(),
1372 begin.as_ptr(),
1373 fdb_len(begin.len(), "begin"),
1374 end.as_ptr(),
1375 fdb_len(end.len(), "end"),
1376 ty.code(),
1377 )
1378 })
1379 }
1380}
1381
1382impl Drop for Transaction {
1383 fn drop(&mut self) {
1384 unsafe {
1385 fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1386 }
1387 }
1388}
1389
1390/// A retryable transaction, generated by Database.run
1391#[derive(Clone)]
1392pub struct RetryableTransaction {
1393 inner: Arc<Transaction>,
1394}
1395
1396impl Deref for RetryableTransaction {
1397 type Target = Transaction;
1398 fn deref(&self) -> &Transaction {
1399 self.inner.deref()
1400 }
1401}
1402
1403impl RetryableTransaction {
1404 pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1405 RetryableTransaction { inner: Arc::new(t) }
1406 }
1407
1408 pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1409 // checking weak references
1410 if Arc::weak_count(&self.inner) != 0 {
1411 return Err(FdbBindingError::ReferenceToTransactionKept);
1412 }
1413 Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1414 }
1415
1416 pub(crate) async fn on_error(
1417 self,
1418 err: FdbError,
1419 ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1420 Ok(self
1421 .take()?
1422 .on_error(err)
1423 .await
1424 .map(RetryableTransaction::new))
1425 }
1426
1427 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1428 pub(crate) async fn commit(
1429 self,
1430 ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1431 Ok(self.take()?.commit().await)
1432 }
1433}