foundationdb/transaction.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::metrics::{FdbCommand, TransactionMetrics};
22use crate::options;
23
24use crate::{error, FdbError, FdbResult};
25use foundationdb_macros::cfg_api_versions;
26
27use crate::error::{FdbBindingError, TransactionMetricsNotFound};
28
29use futures::{
30 future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
31};
32
33#[cfg_api_versions(min = 610)]
34const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
35
36/// A committed transaction.
37#[derive(Debug)]
38#[repr(transparent)]
39pub struct TransactionCommitted {
40 tr: Transaction,
41}
42
43impl TransactionCommitted {
44 /// Retrieves the database version number at which a given transaction was committed.
45 ///
46 /// Read-only transactions do not modify the database when committed and will have a committed
47 /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
48 /// current values may be optimized to a read-only transaction.
49 ///
50 /// Note that database versions are not necessarily unique to a given transaction and so cannot
51 /// be used to determine in what order two transactions completed. The only use for this
52 /// function is to manually enforce causal consistency when calling `set_read_version()` on
53 /// another subsequent transaction.
54 ///
55 /// Most applications will not call this function.
56 pub fn committed_version(&self) -> FdbResult<i64> {
57 let mut version: i64 = 0;
58 error::eval(unsafe {
59 fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
60 })?;
61 Ok(version)
62 }
63
64 /// Reset the transaction to its initial state.
65 ///
66 /// This will not affect previously committed data.
67 ///
68 /// This is similar to dropping the transaction and creating a new one.
69 pub fn reset(mut self) -> Transaction {
70 self.tr.reset();
71 self.tr
72 }
73}
74impl From<TransactionCommitted> for Transaction {
75 fn from(tc: TransactionCommitted) -> Transaction {
76 tc.reset()
77 }
78}
79
80/// A failed to commit transaction.
81pub struct TransactionCommitError {
82 tr: Transaction,
83 err: FdbError,
84}
85
86impl TransactionCommitError {
87 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
88 /// which of the error codes generated by other `Transaction` functions represent temporary
89 /// error conditions and which represent application errors that should be handled by the
90 /// application. It also implements an exponential backoff strategy to avoid swamping the
91 /// database cluster with excessive retries when there is a high level of conflict between
92 /// transactions.
93 ///
94 /// You should not call this method most of the times and use `Database::transact` which
95 /// implements a retry loop strategy for you.
96 pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
97 FdbFuture::<()>::new(unsafe {
98 fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
99 })
100 .map_ok(|()| self.tr)
101 }
102
103 /// Reset the transaction to its initial state.
104 ///
105 /// This is similar to dropping the transaction and creating a new one.
106 pub fn reset(mut self) -> Transaction {
107 self.tr.reset();
108 self.tr
109 }
110}
111
112impl Deref for TransactionCommitError {
113 type Target = FdbError;
114 fn deref(&self) -> &FdbError {
115 &self.err
116 }
117}
118
119impl From<TransactionCommitError> for FdbError {
120 fn from(tce: TransactionCommitError) -> FdbError {
121 tce.err
122 }
123}
124
125impl fmt::Debug for TransactionCommitError {
126 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
127 write!(f, "TransactionCommitError({})", self.err)
128 }
129}
130
131impl fmt::Display for TransactionCommitError {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 self.err.fmt(f)
134 }
135}
136
137impl std::error::Error for TransactionCommitError {}
138
139/// The result of `Transaction::Commit`
140type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
141
142/// A cancelled transaction
143#[derive(Debug)]
144#[repr(transparent)]
145pub struct TransactionCancelled {
146 tr: Transaction,
147}
148impl TransactionCancelled {
149 /// Reset the transaction to its initial state.
150 ///
151 /// This is similar to dropping the transaction and creating a new one.
152 pub fn reset(mut self) -> Transaction {
153 self.tr.reset();
154 self.tr
155 }
156}
157impl From<TransactionCancelled> for Transaction {
158 fn from(tc: TransactionCancelled) -> Transaction {
159 tc.reset()
160 }
161}
162
163/// In FoundationDB, a transaction is a mutable snapshot of a database.
164///
165/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
166///
167/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
168///
169/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
170///
171/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
172#[derive(Debug)]
173pub struct Transaction {
174 // Order of fields should not be changed, because Rust drops field top-to-bottom, and
175 // transaction should be dropped before cluster.
176 inner: NonNull<fdb_sys::FDBTransaction>,
177 metrics: Option<TransactionMetrics>,
178}
179unsafe impl Send for Transaction {}
180unsafe impl Sync for Transaction {}
181
182/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
183#[inline]
184fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
185 if v {
186 1
187 } else {
188 0
189 }
190}
191#[inline]
192fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
193 assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
194 len as i32
195}
196#[inline]
197fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
198 if iteration > i32::MAX as usize {
199 0 // this will cause client_invalid_operation
200 } else {
201 iteration as i32
202 }
203}
204#[inline]
205fn fdb_limit(v: usize) -> std::os::raw::c_int {
206 if v > i32::MAX as usize {
207 i32::MAX
208 } else {
209 v as i32
210 }
211}
212
213/// `RangeOption` represents a query parameters for range scan query.
214///
215/// You can construct `RangeOption` easily:
216///
217/// ```
218/// use foundationdb::RangeOption;
219///
220/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
221/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
222/// let opt = RangeOption {
223/// limit: Some(10),
224/// ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
225/// };
226/// ```
227#[derive(Debug, Clone)]
228pub struct RangeOption<'a> {
229 /// The beginning of the range.
230 pub begin: KeySelector<'a>,
231 /// The end of the range.
232 pub end: KeySelector<'a>,
233 /// If non-zero, indicates the maximum number of key-value pairs to return.
234 pub limit: Option<usize>,
235 /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
236 /// return for each item.
237 pub target_bytes: usize,
238 /// One of the options::StreamingMode values indicating how the caller would like the data in
239 /// the range returned.
240 pub mode: options::StreamingMode,
241 /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
242 /// the end of the range.
243 pub reverse: bool,
244 #[doc(hidden)]
245 pub __non_exhaustive: std::marker::PhantomData<()>,
246}
247
248impl RangeOption<'_> {
249 /// Reverses the range direction.
250 pub fn rev(mut self) -> Self {
251 self.reverse = !self.reverse;
252 self
253 }
254
255 pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
256 if !kvs.more() {
257 return None;
258 }
259
260 let last = kvs.last()?;
261 let last_key = last.key();
262
263 if let Some(limit) = self.limit.as_mut() {
264 *limit = limit.saturating_sub(kvs.len());
265 if *limit == 0 {
266 return None;
267 }
268 }
269
270 if self.reverse {
271 self.end.make_first_greater_or_equal(last_key);
272 } else {
273 self.begin.make_first_greater_than(last_key);
274 }
275 Some(self)
276 }
277
278 #[cfg_api_versions(min = 710)]
279 pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
280 if !kvs.more() {
281 return None;
282 }
283
284 let last = kvs.last()?;
285 let last_key = last.parent_key();
286
287 if let Some(limit) = self.limit.as_mut() {
288 *limit = limit.saturating_sub(kvs.len());
289 if *limit == 0 {
290 return None;
291 }
292 }
293
294 if self.reverse {
295 self.end.make_first_greater_or_equal(last_key);
296 } else {
297 self.begin.make_first_greater_than(last_key);
298 }
299 Some(self)
300 }
301}
302
303impl Default for RangeOption<'_> {
304 fn default() -> Self {
305 Self {
306 begin: KeySelector::first_greater_or_equal([].as_ref()),
307 end: KeySelector::first_greater_or_equal([].as_ref()),
308 limit: None,
309 target_bytes: 0,
310 mode: options::StreamingMode::Iterator,
311 reverse: false,
312 __non_exhaustive: std::marker::PhantomData,
313 }
314 }
315}
316
317impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
318 fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
319 Self {
320 begin,
321 end,
322 ..Self::default()
323 }
324 }
325}
326impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
327 fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
328 Self {
329 begin: KeySelector::first_greater_or_equal(begin),
330 end: KeySelector::first_greater_or_equal(end),
331 ..Self::default()
332 }
333 }
334}
335impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
336 fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
337 Self {
338 begin: KeySelector::first_greater_or_equal(begin),
339 end: KeySelector::first_greater_or_equal(end),
340 ..Self::default()
341 }
342 }
343}
344impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
345 fn from(range: Range<KeySelector<'a>>) -> Self {
346 RangeOption::from((range.start, range.end))
347 }
348}
349
350impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
351 fn from(range: Range<&'a [u8]>) -> Self {
352 RangeOption::from((range.start, range.end))
353 }
354}
355
356impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
357 fn from(range: Range<Vec<u8>>) -> Self {
358 RangeOption::from((range.start, range.end))
359 }
360}
361
362impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
363 fn from(range: RangeInclusive<&'a [u8]>) -> Self {
364 let (start, end) = range.into_inner();
365 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
366 }
367}
368
369impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
370 fn from(range: RangeInclusive<Vec<u8>>) -> Self {
371 let (start, end) = range.into_inner();
372 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
373 }
374}
375
376impl Transaction {
377 pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
378 Self {
379 inner,
380 metrics: None,
381 }
382 }
383
384 pub fn new_instrumented(
385 inner: NonNull<fdb_sys::FDBTransaction>,
386 metrics: TransactionMetrics,
387 ) -> Self {
388 Self {
389 inner,
390 metrics: Some(metrics),
391 }
392 }
393
394 /// Called to set an option on an FDBTransaction.
395 pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
396 unsafe { opt.apply(self.inner.as_ptr()) }
397 }
398
399 /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
400 /// where the code and data will be provided as raw, in order to avoid deserializing to an option
401 /// and serializing it back to code and data.
402 /// In general, you should use `set_option`.
403 pub fn set_raw_option(
404 &self,
405 code: fdb_sys::FDBTransactionOption,
406 data: Option<Vec<u8>>,
407 ) -> FdbResult<()> {
408 let (data_ptr, size) = data
409 .as_ref()
410 .map(|data| {
411 (
412 data.as_ptr(),
413 i32::try_from(data.len()).expect("len to fit in i32"),
414 )
415 })
416 .unwrap_or_else(|| (std::ptr::null(), 0));
417 let err = unsafe {
418 fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
419 };
420 if err != 0 {
421 Err(FdbError::from_code(err))
422 } else {
423 Ok(())
424 }
425 }
426
427 /// Modify the database snapshot represented by transaction to change the given
428 /// key to have the given value.
429 ///
430 /// If the given key was not previously present in the database it is inserted.
431 /// The modification affects the actual database only if transaction is later
432 /// committed with `Transaction::commit`.
433 ///
434 /// # Arguments
435 ///
436 /// * `key` - the name of the key to be inserted into the database.
437 /// * `value` - the value to be inserted into the database
438 #[cfg_attr(
439 feature = "trace",
440 tracing::instrument(level = "debug", skip(self, key, value))
441 )]
442 pub fn set(&self, key: &[u8], value: &[u8]) {
443 unsafe {
444 fdb_sys::fdb_transaction_set(
445 self.inner.as_ptr(),
446 key.as_ptr(),
447 fdb_len(key.len(), "key"),
448 value.as_ptr(),
449 fdb_len(value.len(), "value"),
450 )
451 }
452
453 if let Some(metrics) = &self.metrics {
454 metrics.report_metrics(FdbCommand::Set((key.len() + value.len()) as u64));
455 }
456 }
457
458 /// Modify the database snapshot represented by transaction to remove the given key from the
459 /// database.
460 ///
461 /// If the key was not previously present in the database, there is no effect. The modification
462 /// affects the actual database only if transaction is later committed with
463 /// `Transaction::commit`.
464 ///
465 /// # Arguments
466 ///
467 /// * `key` - the name of the key to be removed from the database.
468 #[cfg_attr(
469 feature = "trace",
470 tracing::instrument(level = "debug", skip(self, key))
471 )]
472 pub fn clear(&self, key: &[u8]) {
473 unsafe {
474 fdb_sys::fdb_transaction_clear(
475 self.inner.as_ptr(),
476 key.as_ptr(),
477 fdb_len(key.len(), "key"),
478 )
479 }
480
481 if let Some(metrics) = &self.metrics {
482 metrics.report_metrics(FdbCommand::Clear);
483 }
484 }
485
486 /// Reads a value from the database snapshot represented by transaction.
487 ///
488 /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
489 ///
490 /// # Arguments
491 ///
492 /// * `key` - the name of the key to be looked up in the database
493 /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
494 #[cfg_attr(
495 feature = "trace",
496 tracing::instrument(level = "debug", skip(self, key))
497 )]
498 pub fn get(
499 &self,
500 key: &[u8],
501 snapshot: bool,
502 ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
503 let metrics = self.metrics.clone();
504 let lenght_key = key.len();
505
506 FdbFuture::<Option<FdbSlice>>::new(unsafe {
507 fdb_sys::fdb_transaction_get(
508 self.inner.as_ptr(),
509 key.as_ptr(),
510 fdb_len(key.len(), "key"),
511 fdb_bool(snapshot),
512 )
513 })
514 .map(move |result| {
515 if let Ok(value) = &result {
516 if let Some(metrics) = metrics.as_ref() {
517 let (bytes_count, kv_fetched) = if let Some(values) = value {
518 ((lenght_key + values.len()) as u64, 1)
519 } else {
520 (lenght_key as u64, 0)
521 };
522 metrics.report_metrics(FdbCommand::Get(bytes_count, kv_fetched));
523 }
524 }
525 result
526 })
527 }
528
529 /// Modify the database snapshot represented by transaction to perform the operation indicated
530 /// by operationType with operand param to the value stored by the given key.
531 ///
532 /// An atomic operation is a single database command that carries out several logical steps:
533 /// reading the value of a key, performing a transformation on that value, and writing the
534 /// result. Different atomic operations perform different transformations. Like other database
535 /// operations, an atomic operation is used within a transaction; however, its use within a
536 /// transaction will not cause the transaction to conflict.
537 ///
538 /// Atomic operations do not expose the current value of the key to the client but simply send
539 /// the database the transformation to apply. In regard to conflict checking, an atomic
540 /// operation is equivalent to a write without a read. It can only cause other transactions
541 /// performing reads of the key to conflict.
542 ///
543 /// By combining these logical steps into a single, read-free operation, FoundationDB can
544 /// guarantee that the transaction will not conflict due to the operation. This makes atomic
545 /// operations ideal for operating on keys that are frequently modified. A common example is
546 /// the use of a key-value pair as a counter.
547 ///
548 /// # Warning
549 ///
550 /// If a transaction uses both an atomic operation and a strictly serializable read on the same
551 /// key, the benefits of using the atomic operation (for both conflict checking and performance)
552 /// are lost.
553 #[cfg_attr(
554 feature = "trace",
555 tracing::instrument(level = "debug", skip(self, key, param))
556 )]
557 pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
558 unsafe {
559 fdb_sys::fdb_transaction_atomic_op(
560 self.inner.as_ptr(),
561 key.as_ptr(),
562 fdb_len(key.len(), "key"),
563 param.as_ptr(),
564 fdb_len(param.len(), "param"),
565 op_type.code(),
566 )
567 }
568
569 if let Some(metrics) = &self.metrics {
570 metrics.report_metrics(FdbCommand::Atomic);
571 }
572 }
573
574 /// Resolves a key selector against the keys in the database snapshot represented by
575 /// transaction.
576 ///
577 /// Returns an FDBFuture which will be set to the key in the database matching the key
578 /// selector.
579 ///
580 /// # Arguments
581 ///
582 /// * `selector`: the key selector
583 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
584 #[cfg_attr(
585 feature = "trace",
586 tracing::instrument(level = "debug", skip(self, selector, snapshot))
587 )]
588 pub fn get_key(
589 &self,
590 selector: &KeySelector,
591 snapshot: bool,
592 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
593 let key = selector.key();
594 FdbFuture::new(unsafe {
595 fdb_sys::fdb_transaction_get_key(
596 self.inner.as_ptr(),
597 key.as_ptr(),
598 fdb_len(key.len(), "key"),
599 fdb_bool(selector.or_equal()),
600 selector.offset(),
601 fdb_bool(snapshot),
602 )
603 })
604 }
605
606 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
607 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
608 /// equal to the key resolved by the begin key selector and lexicographically less than the key
609 /// resolved by the end key selector.
610 ///
611 /// Returns a stream of KeyValue slices.
612 ///
613 /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
614 /// use.
615 ///
616 /// # Arguments
617 ///
618 /// * `opt`: the range, limit, target_bytes and mode
619 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
620 #[cfg_attr(
621 feature = "trace",
622 tracing::instrument(level = "debug", skip(self, opt, snapshot))
623 )]
624 pub fn get_ranges<'a>(
625 &'a self,
626 opt: RangeOption<'a>,
627 snapshot: bool,
628 ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
629 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
630 if let Some(opt) = maybe_opt {
631 Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
632 move |maybe_values| {
633 let next_opt = match &maybe_values {
634 Ok(values) => opt.next_range(values),
635 Err(..) => None,
636 };
637 Some((maybe_values, (iteration + 1, next_opt)))
638 },
639 ))
640 } else {
641 Either::Right(future::ready(None))
642 }
643 })
644 }
645
646 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
647 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
648 /// equal to the key resolved by the begin key selector and lexicographically less than the key
649 /// resolved by the end key selector.
650 ///
651 /// Returns a stream of KeyValue.
652 ///
653 /// # Arguments
654 ///
655 /// * `opt`: the range, limit, target_bytes and mode
656 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
657 #[cfg_attr(
658 feature = "trace",
659 tracing::instrument(level = "debug", skip(self, opt, snapshot))
660 )]
661 pub fn get_ranges_keyvalues<'a>(
662 &'a self,
663 opt: RangeOption<'a>,
664 snapshot: bool,
665 ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
666 self.get_ranges(opt, snapshot)
667 .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
668 .try_flatten()
669 }
670
671 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
672 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
673 /// equal to the key resolved by the begin key selector and lexicographically less than the key
674 /// resolved by the end key selector.
675 ///
676 /// # Arguments
677 ///
678 /// * `opt`: the range, limit, target_bytes and mode
679 /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
680 /// by 1 for each successive call while reading this range. In all other cases it is ignored.
681 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
682 #[cfg_attr(
683 feature = "trace",
684 tracing::instrument(level = "debug", skip(self, opt, snapshot))
685 )]
686 pub fn get_range(
687 &self,
688 opt: &RangeOption,
689 iteration: usize,
690 snapshot: bool,
691 ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
692 let begin = &opt.begin;
693 let end = &opt.end;
694 let key_begin = begin.key();
695 let key_end = end.key();
696
697 let metrics = self.metrics.clone();
698
699 FdbFuture::<FdbValues>::new(unsafe {
700 fdb_sys::fdb_transaction_get_range(
701 self.inner.as_ptr(),
702 key_begin.as_ptr(),
703 fdb_len(key_begin.len(), "key_begin"),
704 fdb_bool(begin.or_equal()),
705 begin.offset(),
706 key_end.as_ptr(),
707 fdb_len(key_end.len(), "key_end"),
708 fdb_bool(end.or_equal()),
709 end.offset(),
710 fdb_limit(opt.limit.unwrap_or(0)),
711 fdb_limit(opt.target_bytes),
712 opt.mode.code(),
713 fdb_iteration(iteration),
714 fdb_bool(snapshot),
715 fdb_bool(opt.reverse),
716 )
717 })
718 .map(move |result| {
719 if let (Ok(values), Some(metrics)) = (&result, metrics) {
720 let kv_fetched = values.len();
721 let mut bytes_count = 0;
722
723 for key_value in values.as_ref() {
724 let key_len = key_value.key().len();
725 let value_len = key_value.value().len();
726
727 bytes_count += (key_len + value_len) as u64
728 }
729
730 metrics.report_metrics(FdbCommand::GetRange(bytes_count, kv_fetched as u64));
731 };
732
733 result
734 })
735 }
736
737 /// Mapped Range is an experimental feature introduced in FDB 7.1.
738 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
739 /// In such a case, querying records by scanning an index in relational databases can be
740 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
741 ///
742 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
743 /// this can happen in one request without additional back and forth. Considering the overhead
744 /// of each request, this saves time and resources on serialization, deserialization, and network.
745 ///
746 /// A mapped request will:
747 ///
748 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
749 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
750 /// * Put all results in a nested structure and return them.
751 ///
752 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
753 /// using snapshot isolation AND disabling read-your-writes.
754 ///
755 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
756 ///
757 /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
758 #[cfg_api_versions(min = 710)]
759 #[cfg_attr(
760 feature = "trace",
761 tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
762 )]
763 pub fn get_mapped_range(
764 &self,
765 opt: &RangeOption,
766 mapper: &[u8],
767 iteration: usize,
768 snapshot: bool,
769 ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
770 let begin = &opt.begin;
771 let end = &opt.end;
772 let key_begin = begin.key();
773 let key_end = end.key();
774
775 FdbFuture::new(unsafe {
776 fdb_sys::fdb_transaction_get_mapped_range(
777 self.inner.as_ptr(),
778 key_begin.as_ptr(),
779 fdb_len(key_begin.len(), "key_begin"),
780 fdb_bool(begin.or_equal()),
781 begin.offset(),
782 key_end.as_ptr(),
783 fdb_len(key_end.len(), "key_end"),
784 fdb_bool(end.or_equal()),
785 end.offset(),
786 mapper.as_ptr(),
787 fdb_len(mapper.len(), "mapper_length"),
788 fdb_limit(opt.limit.unwrap_or(0)),
789 fdb_limit(opt.target_bytes),
790 opt.mode.code(),
791 fdb_iteration(iteration),
792 fdb_bool(snapshot),
793 fdb_bool(opt.reverse),
794 )
795 })
796 }
797
798 /// Mapped Range is an experimental feature introduced in FDB 7.1.
799 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
800 /// In such a case, querying records by scanning an index in relational databases can be
801 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
802 ///
803 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
804 /// this can happen in one request without additional back and forth. Considering the overhead
805 /// of each request, this saves time and resources on serialization, deserialization, and network.
806 ///
807 /// A mapped request will:
808 ///
809 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
810 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
811 /// * Put all results in a nested structure and return them.
812 ///
813 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
814 /// using snapshot isolation AND disabling read-your-writes.
815 ///
816 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
817 #[cfg_api_versions(min = 710)]
818 #[cfg_attr(
819 feature = "trace",
820 tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
821 )]
822 pub fn get_mapped_ranges<'a>(
823 &'a self,
824 opt: RangeOption<'a>,
825 mapper: &'a [u8],
826 snapshot: bool,
827 ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
828 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
829 if let Some(opt) = maybe_opt {
830 Either::Left(
831 self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
832 .map(move |maybe_values| {
833 let next_opt = match &maybe_values {
834 Ok(values) => opt.next_mapped_range(values),
835 Err(..) => None,
836 };
837 Some((maybe_values, (iteration + 1, next_opt)))
838 }),
839 )
840 } else {
841 Either::Right(future::ready(None))
842 }
843 })
844 }
845
846 /// Modify the database snapshot represented by transaction to remove all keys (if any) which
847 /// are lexicographically greater than or equal to the given begin key and lexicographically
848 /// less than the given end_key.
849 ///
850 /// The modification affects the actual database only if transaction is later committed with
851 /// `Transaction::commit`.
852 #[cfg_attr(
853 feature = "trace",
854 tracing::instrument(level = "debug", skip(self, begin, end))
855 )]
856 pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
857 unsafe {
858 fdb_sys::fdb_transaction_clear_range(
859 self.inner.as_ptr(),
860 begin.as_ptr(),
861 fdb_len(begin.len(), "begin"),
862 end.as_ptr(),
863 fdb_len(end.len(), "end"),
864 )
865 }
866
867 if let Some(metrics) = &self.metrics {
868 metrics.report_metrics(FdbCommand::ClearRange);
869 }
870 }
871
872 /// Get the estimated byte size of the key range based on the byte sample collected by FDB
873 #[cfg_api_versions(min = 630)]
874 pub fn get_estimated_range_size_bytes(
875 &self,
876 begin: &[u8],
877 end: &[u8],
878 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
879 FdbFuture::<i64>::new(unsafe {
880 fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
881 self.inner.as_ptr(),
882 begin.as_ptr(),
883 fdb_len(begin.len(), "begin"),
884 end.as_ptr(),
885 fdb_len(end.len(), "end"),
886 )
887 })
888 }
889
890 /// Attempts to commit the sets and clears previously applied to the database snapshot
891 /// represented by transaction to the actual database.
892 ///
893 /// The commit may or may not succeed – in particular, if a conflicting transaction previously
894 /// committed, then the commit must fail in order to preserve transactional isolation. If the
895 /// commit does succeed, the transaction is durably committed to the database and all
896 /// subsequently started transactions will observe its effects.
897 ///
898 /// It is not necessary to commit a read-only transaction – you can simply drop it.
899 ///
900 /// Callers will usually want to retry a transaction if the commit or a another method on the
901 /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
902 ///
903 /// As with other client/server databases, in some failure scenarios a client may be unable to
904 /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
905 /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
906 /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
907 /// could execute the transaction twice. In these cases, you must consider the idempotence of
908 /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
909 ///
910 /// Normally, commit will wait for outstanding reads to return. However, if those reads were
911 /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
912 /// any outstanding reads will immediately return errors.
913 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
914 pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
915 FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
916 move |r| match r {
917 Ok(()) => {
918 #[cfg(feature = "trace")]
919 tracing::info!("transaction committed");
920
921 Ok(TransactionCommitted { tr: self })
922 }
923 Err(err) => {
924 #[cfg(feature = "trace")]
925 {
926 let error_code = err.code();
927 tracing::error!(error_code, "could not commit");
928 }
929
930 Err(TransactionCommitError { tr: self, err })
931 }
932 },
933 )
934 }
935
936 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
937 /// which of the error codes generated by other `Transaction` functions represent temporary
938 /// error conditions and which represent application errors that should be handled by the
939 /// application. It also implements an exponential backoff strategy to avoid swamping the
940 /// database cluster with excessive retries when there is a high level of conflict between
941 /// transactions.
942 ///
943 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
944 /// transaction has already been reset.
945 ///
946 /// You should not call this method most of the times and use `Database::transact` which
947 /// implements a retry loop strategy for you.
948 pub fn on_error(
949 self,
950 err: FdbError,
951 ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
952 FdbFuture::<()>::new(unsafe {
953 fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
954 })
955 .map_ok(|()| self)
956 }
957
958 /// Cancels the transaction. All pending or future uses of the transaction will return a
959 /// transaction_cancelled error. The transaction can be used again after it is reset.
960 pub fn cancel(self) -> TransactionCancelled {
961 unsafe {
962 fdb_sys::fdb_transaction_cancel(self.inner.as_ptr());
963 }
964 TransactionCancelled { tr: self }
965 }
966
967 /// Sets a custom metric for the transaction with the specified name, value, and labels.
968 ///
969 /// Custom metrics allow you to track application-specific metrics during transaction execution.
970 /// These metrics are collected alongside the standard FoundationDB metrics and can be used
971 /// for monitoring, debugging, and performance analysis.
972 ///
973 /// # Arguments
974 /// * `name` - The name of the metric (e.g., "query_time", "cache_hits")
975 /// * `value` - The value to set for the metric
976 /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
977 /// (e.g., `[("operation", "read"), ("region", "us-west")]`)
978 ///
979 /// # Returns
980 /// * `Ok(())` if the metric was set successfully
981 /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
982 ///
983 /// # Example
984 /// ```
985 /// # use foundationdb::*;
986 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
987 /// # let db = Database::default()?;
988 /// let metrics = TransactionMetrics::new();
989 /// let txn = db.create_instrumented_trx(metrics)?;
990 ///
991 /// // Set a custom metric with labels
992 /// txn.set_custom_metric("query_processing_time", 42, &[("query_type", "select"), ("table", "users")])?;
993 /// # Ok(())
994 /// # }
995 /// ```
996 ///
997 /// # Note
998 /// This method only works on transactions created with `create_instrumented_trx()` or within
999 /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1000 /// return `Err(TransactionMetricsNotFound)`.
1001 pub fn set_custom_metric(
1002 &self,
1003 name: &str,
1004 value: u64,
1005 labels: &[(&str, &str)],
1006 ) -> Result<(), TransactionMetricsNotFound> {
1007 if let Some(metrics) = &self.metrics {
1008 metrics.set_custom(name, value, labels);
1009 Ok(())
1010 } else {
1011 Err(TransactionMetricsNotFound)
1012 }
1013 }
1014
1015 /// Increments a custom metric for the transaction by the specified amount.
1016 ///
1017 /// This is useful for counting events or accumulating values during transaction execution.
1018 /// If the metric doesn't exist yet, it will be created with the specified amount.
1019 /// If it already exists with the same labels, its value will be incremented.
1020 ///
1021 /// # Arguments
1022 /// * `name` - The name of the metric to increment (e.g., "requests", "bytes_processed")
1023 /// * `amount` - The amount to increment the metric by
1024 /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1025 /// (e.g., `[("status", "success"), ("endpoint", "api/v1/users")]`)
1026 ///
1027 /// # Returns
1028 /// * `Ok(())` if the metric was incremented successfully
1029 /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1030 ///
1031 /// # Example
1032 /// ```
1033 /// # use foundationdb::*;
1034 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1035 /// # let db = Database::default()?;
1036 /// let metrics = TransactionMetrics::new();
1037 /// let txn = db.create_instrumented_trx(metrics)?;
1038 ///
1039 /// // Increment a counter each time a specific operation occurs
1040 /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1041 ///
1042 /// // Later in the transaction, increment it again
1043 /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1044 ///
1045 /// // The final value will be 2
1046 /// # Ok(())
1047 /// # }
1048 /// ```
1049 ///
1050 /// # Note
1051 /// This method only works on transactions created with `create_instrumented_trx()` or within
1052 /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1053 /// return `Err(TransactionMetricsNotFound)`.
1054 pub fn increment_custom_metric(
1055 &self,
1056 name: &str,
1057 amount: u64,
1058 labels: &[(&str, &str)],
1059 ) -> Result<(), TransactionMetricsNotFound> {
1060 if let Some(metrics) = &self.metrics {
1061 metrics.increment_custom(name, amount, labels);
1062 Ok(())
1063 } else {
1064 Err(TransactionMetricsNotFound)
1065 }
1066 }
1067
1068 /// Returns a list of public network addresses as strings, one for each of the storage servers
1069 /// responsible for storing key_name and its associated value.
1070 pub fn get_addresses_for_key(
1071 &self,
1072 key: &[u8],
1073 ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
1074 FdbFuture::new(unsafe {
1075 fdb_sys::fdb_transaction_get_addresses_for_key(
1076 self.inner.as_ptr(),
1077 key.as_ptr(),
1078 fdb_len(key.len(), "key"),
1079 )
1080 })
1081 }
1082
1083 /// A watch's behavior is relative to the transaction that created it. A watch will report a
1084 /// change in relation to the key’s value as readable by that transaction. The initial value
1085 /// used for comparison is either that of the transaction’s read version or the value as
1086 /// modified by the transaction itself prior to the creation of the watch. If the value changes
1087 /// and then changes back to its initial value, the watch might not report the change.
1088 ///
1089 /// Until the transaction that created it has been committed, a watch will not report changes
1090 /// made by other transactions. In contrast, a watch will immediately report changes made by
1091 /// the transaction itself. Watches cannot be created if the transaction has set the
1092 /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
1093 /// watches_disabled error.
1094 ///
1095 /// If the transaction used to create a watch encounters an error during commit, then the watch
1096 /// will be set with that error. A transaction whose commit result is unknown will set all of
1097 /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
1098 /// destroyed, then any watches it created will be set with the transaction_cancelled error.
1099 ///
1100 /// Returns an future representing an empty value that will be set once the watch has
1101 /// detected a change to the value at the specified key.
1102 ///
1103 /// By default, each database connection can have no more than 10,000 watches that have not yet
1104 /// reported a change. When this number is exceeded, an attempt to create a watch will return a
1105 /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
1106 /// Because a watch outlives the transaction that creates it, any watch that is no longer
1107 /// needed should be cancelled by dropping its future.
1108 pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
1109 FdbFuture::new(unsafe {
1110 fdb_sys::fdb_transaction_watch(
1111 self.inner.as_ptr(),
1112 key.as_ptr(),
1113 fdb_len(key.len(), "key"),
1114 )
1115 })
1116 }
1117
1118 /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
1119 /// returned future, which is the summation of the estimated size of mutations, read conflict
1120 /// ranges, and write conflict ranges.
1121 ///
1122 /// This can be called multiple times before the transaction is committed.
1123 #[cfg_api_versions(min = 620)]
1124 pub fn get_approximate_size(
1125 &self,
1126 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
1127 FdbFuture::new(unsafe {
1128 fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
1129 })
1130 }
1131
1132 /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
1133 /// Note: the returned split points contain the start key and end key of the given range.
1134 #[cfg_api_versions(min = 700)]
1135 pub fn get_range_split_points(
1136 &self,
1137 begin: &[u8],
1138 end: &[u8],
1139 chunk_size: i64,
1140 ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
1141 FdbFuture::<FdbKeys>::new(unsafe {
1142 fdb_sys::fdb_transaction_get_range_split_points(
1143 self.inner.as_ptr(),
1144 begin.as_ptr(),
1145 fdb_len(begin.len(), "begin"),
1146 end.as_ptr(),
1147 fdb_len(end.len(), "end"),
1148 chunk_size,
1149 )
1150 })
1151 }
1152
1153 /// Returns an FDBFuture which will be set to the versionstamp which was used by any
1154 /// versionstamp operations in this transaction.
1155 ///
1156 /// The future will be ready only after the successful completion of a call to `commit()` on
1157 /// this Transaction. Read-only transactions do not modify the database when committed and will
1158 /// result in the future completing with an error. Keep in mind that a transaction which reads
1159 /// keys and then sets them to their current values may be optimized to a read-only transaction.
1160 ///
1161 /// Most applications will not call this function.
1162 pub fn get_versionstamp(
1163 &self,
1164 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
1165 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
1166 }
1167
1168 /// The transaction obtains a snapshot read version automatically at the time of the first call
1169 /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
1170 /// compromised by transaction options) is guaranteed to represent all transactions which were
1171 /// reported committed before that call.
1172 pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
1173 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
1174 }
1175
1176 /// Sets the snapshot read version used by a transaction.
1177 ///
1178 /// This is not needed in simple cases.
1179 /// If the given version is too old, subsequent reads will fail with error_code_past_version;
1180 /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
1181 /// error_code_future_version. If any of get_*() have been called on this transaction already,
1182 /// the result is undefined.
1183 pub fn set_read_version(&self, version: i64) {
1184 unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
1185 }
1186
1187 /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
1188 /// The value of this key is sent to clients along with the read version from the proxy,
1189 /// so a client can read its value without communicating with a storage server.
1190 /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
1191 #[cfg_api_versions(min = 610)]
1192 pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
1193 match self.get(METADATA_VERSION_KEY, snapshot).await {
1194 Ok(Some(fdb_slice)) => {
1195 let value = fdb_slice.deref();
1196 // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
1197 // can we assume that it will always be the correct size?
1198 if value.len() < 8 {
1199 return Ok(None);
1200 }
1201
1202 // The 80-bits versionstamps are 10 bytes longs, and are composed of:
1203 // * 8 bytes (Transaction Version)
1204 // * followed by 2 bytes (Transaction Batch Order)
1205 // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
1206 let mut arr = [0u8; 8];
1207 arr.copy_from_slice(&value[0..8]);
1208 let transaction_version: i64 = i64::from_be_bytes(arr);
1209
1210 Ok(Some(transaction_version))
1211 }
1212 Ok(None) => Ok(None),
1213
1214 Err(err) => Err(err),
1215 }
1216 }
1217
1218 #[cfg_api_versions(min = 610)]
1219 pub fn update_metadata_version(&self) {
1220 // The param is transformed by removing the final four bytes from ``param`` and reading
1221 // those as a little-Endian 32-bit integer to get a position ``pos``.
1222 // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
1223 // versionstamp of the transaction used. The first byte of the parameter is position 0.
1224 // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
1225 let param = vec![0u8; 14];
1226 self.atomic_op(
1227 METADATA_VERSION_KEY,
1228 param.as_slice(),
1229 options::MutationType::SetVersionstampedValue,
1230 )
1231 }
1232
1233 /// Reset transaction to its initial state.
1234 ///
1235 /// In order to protect against a race condition with cancel(), this call require a mutable
1236 /// access to the transaction.
1237 ///
1238 /// This is similar to dropping the transaction and creating a new one.
1239 ///
1240 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1241 /// transaction has already been reset.
1242 pub fn reset(&mut self) {
1243 unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1244 }
1245
1246 /// Adds a conflict range to a transaction without performing the associated read or write.
1247 ///
1248 /// # Note
1249 ///
1250 /// Most applications will use the serializable isolation that transactions provide by default
1251 /// and will not need to manipulate conflict ranges.
1252 pub fn add_conflict_range(
1253 &self,
1254 begin: &[u8],
1255 end: &[u8],
1256 ty: options::ConflictRangeType,
1257 ) -> FdbResult<()> {
1258 error::eval(unsafe {
1259 fdb_sys::fdb_transaction_add_conflict_range(
1260 self.inner.as_ptr(),
1261 begin.as_ptr(),
1262 fdb_len(begin.len(), "begin"),
1263 end.as_ptr(),
1264 fdb_len(end.len(), "end"),
1265 ty.code(),
1266 )
1267 })
1268 }
1269}
1270
1271impl Drop for Transaction {
1272 fn drop(&mut self) {
1273 unsafe {
1274 fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1275 }
1276 }
1277}
1278
1279/// A retryable transaction, generated by Database.run
1280#[derive(Clone)]
1281pub struct RetryableTransaction {
1282 inner: Arc<Transaction>,
1283}
1284
1285impl Deref for RetryableTransaction {
1286 type Target = Transaction;
1287 fn deref(&self) -> &Transaction {
1288 self.inner.deref()
1289 }
1290}
1291
1292impl RetryableTransaction {
1293 pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1294 RetryableTransaction { inner: Arc::new(t) }
1295 }
1296
1297 pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1298 // checking weak references
1299 if Arc::weak_count(&self.inner) != 0 {
1300 return Err(FdbBindingError::ReferenceToTransactionKept);
1301 }
1302 Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1303 }
1304
1305 pub(crate) async fn on_error(
1306 self,
1307 err: FdbError,
1308 ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1309 Ok(self
1310 .take()?
1311 .on_error(err)
1312 .await
1313 .map(RetryableTransaction::new))
1314 }
1315
1316 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1317 pub(crate) async fn commit(
1318 self,
1319 ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1320 Ok(self.take()?.commit().await)
1321 }
1322}