foundationdb/transaction.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::options;
22
23use crate::{error, FdbError, FdbResult};
24use foundationdb_macros::cfg_api_versions;
25
26use crate::error::FdbBindingError;
27
28use futures::{
29 future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
30};
31
32#[cfg_api_versions(min = 610)]
33const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
34
35/// A committed transaction.
36#[derive(Debug)]
37#[repr(transparent)]
38pub struct TransactionCommitted {
39 tr: Transaction,
40}
41
42impl TransactionCommitted {
43 /// Retrieves the database version number at which a given transaction was committed.
44 ///
45 /// Read-only transactions do not modify the database when committed and will have a committed
46 /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
47 /// current values may be optimized to a read-only transaction.
48 ///
49 /// Note that database versions are not necessarily unique to a given transaction and so cannot
50 /// be used to determine in what order two transactions completed. The only use for this
51 /// function is to manually enforce causal consistency when calling `set_read_version()` on
52 /// another subsequent transaction.
53 ///
54 /// Most applications will not call this function.
55 pub fn committed_version(&self) -> FdbResult<i64> {
56 let mut version: i64 = 0;
57 error::eval(unsafe {
58 fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
59 })?;
60 Ok(version)
61 }
62
63 /// Reset the transaction to its initial state.
64 ///
65 /// This will not affect previously committed data.
66 ///
67 /// This is similar to dropping the transaction and creating a new one.
68 pub fn reset(mut self) -> Transaction {
69 self.tr.reset();
70 self.tr
71 }
72}
73impl From<TransactionCommitted> for Transaction {
74 fn from(tc: TransactionCommitted) -> Transaction {
75 tc.reset()
76 }
77}
78
79/// A failed to commit transaction.
80pub struct TransactionCommitError {
81 tr: Transaction,
82 err: FdbError,
83}
84
85impl TransactionCommitError {
86 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
87 /// which of the error codes generated by other `Transaction` functions represent temporary
88 /// error conditions and which represent application errors that should be handled by the
89 /// application. It also implements an exponential backoff strategy to avoid swamping the
90 /// database cluster with excessive retries when there is a high level of conflict between
91 /// transactions.
92 ///
93 /// You should not call this method most of the times and use `Database::transact` which
94 /// implements a retry loop strategy for you.
95 pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
96 FdbFuture::<()>::new(unsafe {
97 fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
98 })
99 .map_ok(|()| self.tr)
100 }
101
102 /// Reset the transaction to its initial state.
103 ///
104 /// This is similar to dropping the transaction and creating a new one.
105 pub fn reset(mut self) -> Transaction {
106 self.tr.reset();
107 self.tr
108 }
109}
110
111impl Deref for TransactionCommitError {
112 type Target = FdbError;
113 fn deref(&self) -> &FdbError {
114 &self.err
115 }
116}
117
118impl From<TransactionCommitError> for FdbError {
119 fn from(tce: TransactionCommitError) -> FdbError {
120 tce.err
121 }
122}
123
124impl fmt::Debug for TransactionCommitError {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 write!(f, "TransactionCommitError({})", self.err)
127 }
128}
129
130impl fmt::Display for TransactionCommitError {
131 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132 self.err.fmt(f)
133 }
134}
135
136impl std::error::Error for TransactionCommitError {}
137
138/// The result of `Transaction::Commit`
139type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
140
141/// A cancelled transaction
142#[derive(Debug)]
143#[repr(transparent)]
144pub struct TransactionCancelled {
145 tr: Transaction,
146}
147impl TransactionCancelled {
148 /// Reset the transaction to its initial state.
149 ///
150 /// This is similar to dropping the transaction and creating a new one.
151 pub fn reset(mut self) -> Transaction {
152 self.tr.reset();
153 self.tr
154 }
155}
156impl From<TransactionCancelled> for Transaction {
157 fn from(tc: TransactionCancelled) -> Transaction {
158 tc.reset()
159 }
160}
161
162/// In FoundationDB, a transaction is a mutable snapshot of a database.
163///
164/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
165///
166/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
167///
168/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
169///
170/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
171#[derive(Debug)]
172pub struct Transaction {
173 // Order of fields should not be changed, because Rust drops field top-to-bottom, and
174 // transaction should be dropped before cluster.
175 inner: NonNull<fdb_sys::FDBTransaction>,
176}
177unsafe impl Send for Transaction {}
178unsafe impl Sync for Transaction {}
179
180/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
181#[inline]
182fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
183 if v {
184 1
185 } else {
186 0
187 }
188}
189#[inline]
190fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
191 assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
192 len as i32
193}
194#[inline]
195fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
196 if iteration > i32::MAX as usize {
197 0 // this will cause client_invalid_operation
198 } else {
199 iteration as i32
200 }
201}
202#[inline]
203fn fdb_limit(v: usize) -> std::os::raw::c_int {
204 if v > i32::MAX as usize {
205 i32::MAX
206 } else {
207 v as i32
208 }
209}
210
211/// `RangeOption` represents a query parameters for range scan query.
212///
213/// You can construct `RangeOption` easily:
214///
215/// ```
216/// use foundationdb::RangeOption;
217///
218/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
219/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
220/// let opt = RangeOption {
221/// limit: Some(10),
222/// ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
223/// };
224/// ```
225#[derive(Debug, Clone)]
226pub struct RangeOption<'a> {
227 /// The beginning of the range.
228 pub begin: KeySelector<'a>,
229 /// The end of the range.
230 pub end: KeySelector<'a>,
231 /// If non-zero, indicates the maximum number of key-value pairs to return.
232 pub limit: Option<usize>,
233 /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
234 /// return for each item.
235 pub target_bytes: usize,
236 /// One of the options::StreamingMode values indicating how the caller would like the data in
237 /// the range returned.
238 pub mode: options::StreamingMode,
239 /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
240 /// the end of the range.
241 pub reverse: bool,
242 #[doc(hidden)]
243 pub __non_exhaustive: std::marker::PhantomData<()>,
244}
245
246impl RangeOption<'_> {
247 /// Reverses the range direction.
248 pub fn rev(mut self) -> Self {
249 self.reverse = !self.reverse;
250 self
251 }
252
253 pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
254 if !kvs.more() {
255 return None;
256 }
257
258 let last = kvs.last()?;
259 let last_key = last.key();
260
261 if let Some(limit) = self.limit.as_mut() {
262 *limit = limit.saturating_sub(kvs.len());
263 if *limit == 0 {
264 return None;
265 }
266 }
267
268 if self.reverse {
269 self.end.make_first_greater_or_equal(last_key);
270 } else {
271 self.begin.make_first_greater_than(last_key);
272 }
273 Some(self)
274 }
275
276 #[cfg_api_versions(min = 710)]
277 pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
278 if !kvs.more() {
279 return None;
280 }
281
282 let last = kvs.last()?;
283 let last_key = last.parent_key();
284
285 if let Some(limit) = self.limit.as_mut() {
286 *limit = limit.saturating_sub(kvs.len());
287 if *limit == 0 {
288 return None;
289 }
290 }
291
292 if self.reverse {
293 self.end.make_first_greater_or_equal(last_key);
294 } else {
295 self.begin.make_first_greater_than(last_key);
296 }
297 Some(self)
298 }
299}
300
301impl Default for RangeOption<'_> {
302 fn default() -> Self {
303 Self {
304 begin: KeySelector::first_greater_or_equal([].as_ref()),
305 end: KeySelector::first_greater_or_equal([].as_ref()),
306 limit: None,
307 target_bytes: 0,
308 mode: options::StreamingMode::Iterator,
309 reverse: false,
310 __non_exhaustive: std::marker::PhantomData,
311 }
312 }
313}
314
315impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
316 fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
317 Self {
318 begin,
319 end,
320 ..Self::default()
321 }
322 }
323}
324impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
325 fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
326 Self {
327 begin: KeySelector::first_greater_or_equal(begin),
328 end: KeySelector::first_greater_or_equal(end),
329 ..Self::default()
330 }
331 }
332}
333impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
334 fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
335 Self {
336 begin: KeySelector::first_greater_or_equal(begin),
337 end: KeySelector::first_greater_or_equal(end),
338 ..Self::default()
339 }
340 }
341}
342impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
343 fn from(range: Range<KeySelector<'a>>) -> Self {
344 RangeOption::from((range.start, range.end))
345 }
346}
347
348impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
349 fn from(range: Range<&'a [u8]>) -> Self {
350 RangeOption::from((range.start, range.end))
351 }
352}
353
354impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
355 fn from(range: Range<Vec<u8>>) -> Self {
356 RangeOption::from((range.start, range.end))
357 }
358}
359
360impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
361 fn from(range: RangeInclusive<&'a [u8]>) -> Self {
362 let (start, end) = range.into_inner();
363 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
364 }
365}
366
367impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
368 fn from(range: RangeInclusive<Vec<u8>>) -> Self {
369 let (start, end) = range.into_inner();
370 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
371 }
372}
373
374impl Transaction {
375 pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
376 Self { inner }
377 }
378
379 /// Called to set an option on an FDBTransaction.
380 pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
381 unsafe { opt.apply(self.inner.as_ptr()) }
382 }
383
384 /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
385 /// where the code and data will be provided as raw, in order to avoid deserializing to an option
386 /// and serializing it back to code and data.
387 /// In general, you should use `set_option`.
388 pub fn set_raw_option(
389 &self,
390 code: fdb_sys::FDBTransactionOption,
391 data: Option<Vec<u8>>,
392 ) -> FdbResult<()> {
393 let (data_ptr, size) = data
394 .as_ref()
395 .map(|data| {
396 (
397 data.as_ptr(),
398 i32::try_from(data.len()).expect("len to fit in i32"),
399 )
400 })
401 .unwrap_or_else(|| (std::ptr::null(), 0));
402 let err = unsafe {
403 fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
404 };
405 if err != 0 {
406 Err(FdbError::from_code(err))
407 } else {
408 Ok(())
409 }
410 }
411
412 /// Modify the database snapshot represented by transaction to change the given
413 /// key to have the given value.
414 ///
415 /// If the given key was not previously present in the database it is inserted.
416 /// The modification affects the actual database only if transaction is later
417 /// committed with `Transaction::commit`.
418 ///
419 /// # Arguments
420 ///
421 /// * `key` - the name of the key to be inserted into the database.
422 /// * `value` - the value to be inserted into the database
423 pub fn set(&self, key: &[u8], value: &[u8]) {
424 unsafe {
425 fdb_sys::fdb_transaction_set(
426 self.inner.as_ptr(),
427 key.as_ptr(),
428 fdb_len(key.len(), "key"),
429 value.as_ptr(),
430 fdb_len(value.len(), "value"),
431 )
432 }
433 }
434
435 /// Modify the database snapshot represented by transaction to remove the given key from the
436 /// database.
437 ///
438 /// If the key was not previously present in the database, there is no effect. The modification
439 /// affects the actual database only if transaction is later committed with
440 /// `Transaction::commit`.
441 ///
442 /// # Arguments
443 ///
444 /// * `key` - the name of the key to be removed from the database.
445 pub fn clear(&self, key: &[u8]) {
446 unsafe {
447 fdb_sys::fdb_transaction_clear(
448 self.inner.as_ptr(),
449 key.as_ptr(),
450 fdb_len(key.len(), "key"),
451 )
452 }
453 }
454
455 /// Reads a value from the database snapshot represented by transaction.
456 ///
457 /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
458 ///
459 /// # Arguments
460 ///
461 /// * `key` - the name of the key to be looked up in the database
462 /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
463 pub fn get(
464 &self,
465 key: &[u8],
466 snapshot: bool,
467 ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
468 FdbFuture::new(unsafe {
469 fdb_sys::fdb_transaction_get(
470 self.inner.as_ptr(),
471 key.as_ptr(),
472 fdb_len(key.len(), "key"),
473 fdb_bool(snapshot),
474 )
475 })
476 }
477
478 /// Modify the database snapshot represented by transaction to perform the operation indicated
479 /// by operationType with operand param to the value stored by the given key.
480 ///
481 /// An atomic operation is a single database command that carries out several logical steps:
482 /// reading the value of a key, performing a transformation on that value, and writing the
483 /// result. Different atomic operations perform different transformations. Like other database
484 /// operations, an atomic operation is used within a transaction; however, its use within a
485 /// transaction will not cause the transaction to conflict.
486 ///
487 /// Atomic operations do not expose the current value of the key to the client but simply send
488 /// the database the transformation to apply. In regard to conflict checking, an atomic
489 /// operation is equivalent to a write without a read. It can only cause other transactions
490 /// performing reads of the key to conflict.
491 ///
492 /// By combining these logical steps into a single, read-free operation, FoundationDB can
493 /// guarantee that the transaction will not conflict due to the operation. This makes atomic
494 /// operations ideal for operating on keys that are frequently modified. A common example is
495 /// the use of a key-value pair as a counter.
496 ///
497 /// # Warning
498 ///
499 /// If a transaction uses both an atomic operation and a strictly serializable read on the same
500 /// key, the benefits of using the atomic operation (for both conflict checking and performance)
501 /// are lost.
502 pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
503 unsafe {
504 fdb_sys::fdb_transaction_atomic_op(
505 self.inner.as_ptr(),
506 key.as_ptr(),
507 fdb_len(key.len(), "key"),
508 param.as_ptr(),
509 fdb_len(param.len(), "param"),
510 op_type.code(),
511 )
512 }
513 }
514
515 /// Resolves a key selector against the keys in the database snapshot represented by
516 /// transaction.
517 ///
518 /// Returns an FDBFuture which will be set to the key in the database matching the key
519 /// selector.
520 ///
521 /// # Arguments
522 ///
523 /// * `selector`: the key selector
524 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
525 pub fn get_key(
526 &self,
527 selector: &KeySelector,
528 snapshot: bool,
529 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
530 let key = selector.key();
531 FdbFuture::new(unsafe {
532 fdb_sys::fdb_transaction_get_key(
533 self.inner.as_ptr(),
534 key.as_ptr(),
535 fdb_len(key.len(), "key"),
536 fdb_bool(selector.or_equal()),
537 selector.offset(),
538 fdb_bool(snapshot),
539 )
540 })
541 }
542
543 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
544 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
545 /// equal to the key resolved by the begin key selector and lexicographically less than the key
546 /// resolved by the end key selector.
547 ///
548 /// Returns a stream of KeyValue slices.
549 ///
550 /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
551 /// use.
552 ///
553 /// # Arguments
554 ///
555 /// * `opt`: the range, limit, target_bytes and mode
556 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
557 pub fn get_ranges<'a>(
558 &'a self,
559 opt: RangeOption<'a>,
560 snapshot: bool,
561 ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
562 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
563 if let Some(opt) = maybe_opt {
564 Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
565 move |maybe_values| {
566 let next_opt = match &maybe_values {
567 Ok(values) => opt.next_range(values),
568 Err(..) => None,
569 };
570 Some((maybe_values, (iteration + 1, next_opt)))
571 },
572 ))
573 } else {
574 Either::Right(future::ready(None))
575 }
576 })
577 }
578
579 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
580 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
581 /// equal to the key resolved by the begin key selector and lexicographically less than the key
582 /// resolved by the end key selector.
583 ///
584 /// Returns a stream of KeyValue.
585 ///
586 /// # Arguments
587 ///
588 /// * `opt`: the range, limit, target_bytes and mode
589 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
590 pub fn get_ranges_keyvalues<'a>(
591 &'a self,
592 opt: RangeOption<'a>,
593 snapshot: bool,
594 ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
595 self.get_ranges(opt, snapshot)
596 .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
597 .try_flatten()
598 }
599
600 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
601 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
602 /// equal to the key resolved by the begin key selector and lexicographically less than the key
603 /// resolved by the end key selector.
604 ///
605 /// # Arguments
606 ///
607 /// * `opt`: the range, limit, target_bytes and mode
608 /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
609 /// by 1 for each successive call while reading this range. In all other cases it is ignored.
610 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
611 pub fn get_range(
612 &self,
613 opt: &RangeOption,
614 iteration: usize,
615 snapshot: bool,
616 ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
617 let begin = &opt.begin;
618 let end = &opt.end;
619 let key_begin = begin.key();
620 let key_end = end.key();
621
622 FdbFuture::new(unsafe {
623 fdb_sys::fdb_transaction_get_range(
624 self.inner.as_ptr(),
625 key_begin.as_ptr(),
626 fdb_len(key_begin.len(), "key_begin"),
627 fdb_bool(begin.or_equal()),
628 begin.offset(),
629 key_end.as_ptr(),
630 fdb_len(key_end.len(), "key_end"),
631 fdb_bool(end.or_equal()),
632 end.offset(),
633 fdb_limit(opt.limit.unwrap_or(0)),
634 fdb_limit(opt.target_bytes),
635 opt.mode.code(),
636 fdb_iteration(iteration),
637 fdb_bool(snapshot),
638 fdb_bool(opt.reverse),
639 )
640 })
641 }
642
643 /// Mapped Range is an experimental feature introduced in FDB 7.1.
644 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
645 /// In such a case, querying records by scanning an index in relational databases can be
646 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
647 ///
648 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
649 /// this can happen in one request without additional back and forth. Considering the overhead
650 /// of each request, this saves time and resources on serialization, deserialization, and network.
651 ///
652 /// A mapped request will:
653 ///
654 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
655 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
656 /// * Put all results in a nested structure and return them.
657 ///
658 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
659 /// using snapshot isolation AND disabling read-your-writes.
660 ///
661 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
662 ///
663 /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
664 #[cfg_api_versions(min = 710)]
665 pub fn get_mapped_range(
666 &self,
667 opt: &RangeOption,
668 mapper: &[u8],
669 iteration: usize,
670 snapshot: bool,
671 ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
672 let begin = &opt.begin;
673 let end = &opt.end;
674 let key_begin = begin.key();
675 let key_end = end.key();
676
677 FdbFuture::new(unsafe {
678 fdb_sys::fdb_transaction_get_mapped_range(
679 self.inner.as_ptr(),
680 key_begin.as_ptr(),
681 fdb_len(key_begin.len(), "key_begin"),
682 fdb_bool(begin.or_equal()),
683 begin.offset(),
684 key_end.as_ptr(),
685 fdb_len(key_end.len(), "key_end"),
686 fdb_bool(end.or_equal()),
687 end.offset(),
688 mapper.as_ptr(),
689 fdb_len(mapper.len(), "mapper_length"),
690 fdb_limit(opt.limit.unwrap_or(0)),
691 fdb_limit(opt.target_bytes),
692 opt.mode.code(),
693 fdb_iteration(iteration),
694 fdb_bool(snapshot),
695 fdb_bool(opt.reverse),
696 )
697 })
698 }
699
700 /// Mapped Range is an experimental feature introduced in FDB 7.1.
701 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
702 /// In such a case, querying records by scanning an index in relational databases can be
703 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
704 ///
705 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
706 /// this can happen in one request without additional back and forth. Considering the overhead
707 /// of each request, this saves time and resources on serialization, deserialization, and network.
708 ///
709 /// A mapped request will:
710 ///
711 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
712 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
713 /// * Put all results in a nested structure and return them.
714 ///
715 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
716 /// using snapshot isolation AND disabling read-your-writes.
717 ///
718 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
719 #[cfg_api_versions(min = 710)]
720 pub fn get_mapped_ranges<'a>(
721 &'a self,
722 opt: RangeOption<'a>,
723 mapper: &'a [u8],
724 snapshot: bool,
725 ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
726 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
727 if let Some(opt) = maybe_opt {
728 Either::Left(
729 self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
730 .map(move |maybe_values| {
731 let next_opt = match &maybe_values {
732 Ok(values) => opt.next_mapped_range(values),
733 Err(..) => None,
734 };
735 Some((maybe_values, (iteration + 1, next_opt)))
736 }),
737 )
738 } else {
739 Either::Right(future::ready(None))
740 }
741 })
742 }
743
744 /// Modify the database snapshot represented by transaction to remove all keys (if any) which
745 /// are lexicographically greater than or equal to the given begin key and lexicographically
746 /// less than the given end_key.
747 ///
748 /// The modification affects the actual database only if transaction is later committed with
749 /// `Transaction::commit`.
750 pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
751 unsafe {
752 fdb_sys::fdb_transaction_clear_range(
753 self.inner.as_ptr(),
754 begin.as_ptr(),
755 fdb_len(begin.len(), "begin"),
756 end.as_ptr(),
757 fdb_len(end.len(), "end"),
758 )
759 }
760 }
761
762 /// Get the estimated byte size of the key range based on the byte sample collected by FDB
763 #[cfg_api_versions(min = 630)]
764 pub fn get_estimated_range_size_bytes(
765 &self,
766 begin: &[u8],
767 end: &[u8],
768 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
769 FdbFuture::<i64>::new(unsafe {
770 fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
771 self.inner.as_ptr(),
772 begin.as_ptr(),
773 fdb_len(begin.len(), "begin"),
774 end.as_ptr(),
775 fdb_len(end.len(), "end"),
776 )
777 })
778 }
779
780 /// Attempts to commit the sets and clears previously applied to the database snapshot
781 /// represented by transaction to the actual database.
782 ///
783 /// The commit may or may not succeed – in particular, if a conflicting transaction previously
784 /// committed, then the commit must fail in order to preserve transactional isolation. If the
785 /// commit does succeed, the transaction is durably committed to the database and all
786 /// subsequently started transactions will observe its effects.
787 ///
788 /// It is not necessary to commit a read-only transaction – you can simply drop it.
789 ///
790 /// Callers will usually want to retry a transaction if the commit or a another method on the
791 /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
792 ///
793 /// As with other client/server databases, in some failure scenarios a client may be unable to
794 /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
795 /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
796 /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
797 /// could execute the transaction twice. In these cases, you must consider the idempotence of
798 /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
799 ///
800 /// Normally, commit will wait for outstanding reads to return. However, if those reads were
801 /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
802 /// any outstanding reads will immediately return errors.
803 pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
804 FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
805 move |r| match r {
806 Ok(()) => Ok(TransactionCommitted { tr: self }),
807 Err(err) => Err(TransactionCommitError { tr: self, err }),
808 },
809 )
810 }
811
812 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
813 /// which of the error codes generated by other `Transaction` functions represent temporary
814 /// error conditions and which represent application errors that should be handled by the
815 /// application. It also implements an exponential backoff strategy to avoid swamping the
816 /// database cluster with excessive retries when there is a high level of conflict between
817 /// transactions.
818 ///
819 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
820 /// transaction has already been reset.
821 ///
822 /// You should not call this method most of the times and use `Database::transact` which
823 /// implements a retry loop strategy for you.
824 pub fn on_error(
825 self,
826 err: FdbError,
827 ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
828 FdbFuture::<()>::new(unsafe {
829 fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
830 })
831 .map_ok(|()| self)
832 }
833
834 /// Cancels the transaction. All pending or future uses of the transaction will return a
835 /// transaction_cancelled error. The transaction can be used again after it is reset.
836 pub fn cancel(self) -> TransactionCancelled {
837 unsafe { fdb_sys::fdb_transaction_cancel(self.inner.as_ptr()) };
838 TransactionCancelled { tr: self }
839 }
840
841 /// Returns a list of public network addresses as strings, one for each of the storage servers
842 /// responsible for storing key_name and its associated value.
843 pub fn get_addresses_for_key(
844 &self,
845 key: &[u8],
846 ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
847 FdbFuture::new(unsafe {
848 fdb_sys::fdb_transaction_get_addresses_for_key(
849 self.inner.as_ptr(),
850 key.as_ptr(),
851 fdb_len(key.len(), "key"),
852 )
853 })
854 }
855
856 /// A watch's behavior is relative to the transaction that created it. A watch will report a
857 /// change in relation to the key’s value as readable by that transaction. The initial value
858 /// used for comparison is either that of the transaction’s read version or the value as
859 /// modified by the transaction itself prior to the creation of the watch. If the value changes
860 /// and then changes back to its initial value, the watch might not report the change.
861 ///
862 /// Until the transaction that created it has been committed, a watch will not report changes
863 /// made by other transactions. In contrast, a watch will immediately report changes made by
864 /// the transaction itself. Watches cannot be created if the transaction has set the
865 /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
866 /// watches_disabled error.
867 ///
868 /// If the transaction used to create a watch encounters an error during commit, then the watch
869 /// will be set with that error. A transaction whose commit result is unknown will set all of
870 /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
871 /// destroyed, then any watches it created will be set with the transaction_cancelled error.
872 ///
873 /// Returns an future representing an empty value that will be set once the watch has
874 /// detected a change to the value at the specified key.
875 ///
876 /// By default, each database connection can have no more than 10,000 watches that have not yet
877 /// reported a change. When this number is exceeded, an attempt to create a watch will return a
878 /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
879 /// Because a watch outlives the transaction that creates it, any watch that is no longer
880 /// needed should be cancelled by dropping its future.
881 pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
882 FdbFuture::new(unsafe {
883 fdb_sys::fdb_transaction_watch(
884 self.inner.as_ptr(),
885 key.as_ptr(),
886 fdb_len(key.len(), "key"),
887 )
888 })
889 }
890
891 /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
892 /// returned future, which is the summation of the estimated size of mutations, read conflict
893 /// ranges, and write conflict ranges.
894 ///
895 /// This can be called multiple times before the transaction is committed.
896 #[cfg_api_versions(min = 620)]
897 pub fn get_approximate_size(
898 &self,
899 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
900 FdbFuture::new(unsafe {
901 fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
902 })
903 }
904
905 /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
906 /// Note: the returned split points contain the start key and end key of the given range.
907 #[cfg_api_versions(min = 700)]
908 pub fn get_range_split_points(
909 &self,
910 begin: &[u8],
911 end: &[u8],
912 chunk_size: i64,
913 ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
914 FdbFuture::<FdbKeys>::new(unsafe {
915 fdb_sys::fdb_transaction_get_range_split_points(
916 self.inner.as_ptr(),
917 begin.as_ptr(),
918 fdb_len(begin.len(), "begin"),
919 end.as_ptr(),
920 fdb_len(end.len(), "end"),
921 chunk_size,
922 )
923 })
924 }
925
926 /// Returns an FDBFuture which will be set to the versionstamp which was used by any
927 /// versionstamp operations in this transaction.
928 ///
929 /// The future will be ready only after the successful completion of a call to `commit()` on
930 /// this Transaction. Read-only transactions do not modify the database when committed and will
931 /// result in the future completing with an error. Keep in mind that a transaction which reads
932 /// keys and then sets them to their current values may be optimized to a read-only transaction.
933 ///
934 /// Most applications will not call this function.
935 pub fn get_versionstamp(
936 &self,
937 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
938 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
939 }
940
941 /// The transaction obtains a snapshot read version automatically at the time of the first call
942 /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
943 /// compromised by transaction options) is guaranteed to represent all transactions which were
944 /// reported committed before that call.
945 pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
946 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
947 }
948
949 /// Sets the snapshot read version used by a transaction.
950 ///
951 /// This is not needed in simple cases.
952 /// If the given version is too old, subsequent reads will fail with error_code_past_version;
953 /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
954 /// error_code_future_version. If any of get_*() have been called on this transaction already,
955 /// the result is undefined.
956 pub fn set_read_version(&self, version: i64) {
957 unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
958 }
959
960 /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
961 /// The value of this key is sent to clients along with the read version from the proxy,
962 /// so a client can read its value without communicating with a storage server.
963 /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
964 #[cfg_api_versions(min = 610)]
965 pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
966 match self.get(METADATA_VERSION_KEY, snapshot).await {
967 Ok(Some(fdb_slice)) => {
968 let value = fdb_slice.deref();
969 // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
970 // can we assume that it will always be the correct size?
971 if value.len() < 8 {
972 return Ok(None);
973 }
974
975 // The 80-bits versionstamps are 10 bytes longs, and are composed of:
976 // * 8 bytes (Transaction Version)
977 // * followed by 2 bytes (Transaction Batch Order)
978 // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
979 let mut arr = [0u8; 8];
980 arr.copy_from_slice(&value[0..8]);
981 let transaction_version: i64 = i64::from_be_bytes(arr);
982
983 Ok(Some(transaction_version))
984 }
985 Ok(None) => Ok(None),
986
987 Err(err) => Err(err),
988 }
989 }
990
991 #[cfg_api_versions(min = 610)]
992 pub fn update_metadata_version(&self) {
993 // The param is transformed by removing the final four bytes from ``param`` and reading
994 // those as a little-Endian 32-bit integer to get a position ``pos``.
995 // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
996 // versionstamp of the transaction used. The first byte of the parameter is position 0.
997 // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
998 let param = vec![0u8; 14];
999 self.atomic_op(
1000 METADATA_VERSION_KEY,
1001 param.as_slice(),
1002 options::MutationType::SetVersionstampedValue,
1003 )
1004 }
1005
1006 /// Reset transaction to its initial state.
1007 ///
1008 /// In order to protect against a race condition with cancel(), this call require a mutable
1009 /// access to the transaction.
1010 ///
1011 /// This is similar to dropping the transaction and creating a new one.
1012 ///
1013 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1014 /// transaction has already been reset.
1015 pub fn reset(&mut self) {
1016 unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1017 }
1018
1019 /// Adds a conflict range to a transaction without performing the associated read or write.
1020 ///
1021 /// # Note
1022 ///
1023 /// Most applications will use the serializable isolation that transactions provide by default
1024 /// and will not need to manipulate conflict ranges.
1025 pub fn add_conflict_range(
1026 &self,
1027 begin: &[u8],
1028 end: &[u8],
1029 ty: options::ConflictRangeType,
1030 ) -> FdbResult<()> {
1031 error::eval(unsafe {
1032 fdb_sys::fdb_transaction_add_conflict_range(
1033 self.inner.as_ptr(),
1034 begin.as_ptr(),
1035 fdb_len(begin.len(), "begin"),
1036 end.as_ptr(),
1037 fdb_len(end.len(), "end"),
1038 ty.code(),
1039 )
1040 })
1041 }
1042}
1043
1044impl Drop for Transaction {
1045 fn drop(&mut self) {
1046 unsafe {
1047 fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1048 }
1049 }
1050}
1051
1052/// A retryable transaction, generated by Database.run
1053#[derive(Clone)]
1054pub struct RetryableTransaction {
1055 inner: Arc<Transaction>,
1056}
1057
1058impl Deref for RetryableTransaction {
1059 type Target = Transaction;
1060 fn deref(&self) -> &Transaction {
1061 self.inner.deref()
1062 }
1063}
1064
1065impl RetryableTransaction {
1066 pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1067 RetryableTransaction { inner: Arc::new(t) }
1068 }
1069
1070 pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1071 // checking weak references
1072 if Arc::weak_count(&self.inner) != 0 {
1073 return Err(FdbBindingError::ReferenceToTransactionKept);
1074 }
1075 Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1076 }
1077
1078 pub(crate) async fn on_error(
1079 self,
1080 err: FdbError,
1081 ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1082 Ok(self
1083 .take()?
1084 .on_error(err)
1085 .await
1086 .map(RetryableTransaction::new))
1087 }
1088
1089 pub(crate) async fn commit(
1090 self,
1091 ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1092 Ok(self.take()?.commit().await)
1093 }
1094}