foundationdb/transaction.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::options;
22
23use crate::{error, FdbError, FdbResult};
24use foundationdb_macros::cfg_api_versions;
25
26use crate::error::FdbBindingError;
27
28use futures::{
29 future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
30};
31
32#[cfg_api_versions(min = 610)]
33const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
34
35/// A committed transaction.
36#[derive(Debug)]
37#[repr(transparent)]
38pub struct TransactionCommitted {
39 tr: Transaction,
40}
41
42impl TransactionCommitted {
43 /// Retrieves the database version number at which a given transaction was committed.
44 ///
45 /// Read-only transactions do not modify the database when committed and will have a committed
46 /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
47 /// current values may be optimized to a read-only transaction.
48 ///
49 /// Note that database versions are not necessarily unique to a given transaction and so cannot
50 /// be used to determine in what order two transactions completed. The only use for this
51 /// function is to manually enforce causal consistency when calling `set_read_version()` on
52 /// another subsequent transaction.
53 ///
54 /// Most applications will not call this function.
55 pub fn committed_version(&self) -> FdbResult<i64> {
56 let mut version: i64 = 0;
57 error::eval(unsafe {
58 fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
59 })?;
60 Ok(version)
61 }
62
63 /// Reset the transaction to its initial state.
64 ///
65 /// This will not affect previously committed data.
66 ///
67 /// This is similar to dropping the transaction and creating a new one.
68 pub fn reset(mut self) -> Transaction {
69 self.tr.reset();
70 self.tr
71 }
72}
73impl From<TransactionCommitted> for Transaction {
74 fn from(tc: TransactionCommitted) -> Transaction {
75 tc.reset()
76 }
77}
78
79/// A failed to commit transaction.
80pub struct TransactionCommitError {
81 tr: Transaction,
82 err: FdbError,
83}
84
85impl TransactionCommitError {
86 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
87 /// which of the error codes generated by other `Transaction` functions represent temporary
88 /// error conditions and which represent application errors that should be handled by the
89 /// application. It also implements an exponential backoff strategy to avoid swamping the
90 /// database cluster with excessive retries when there is a high level of conflict between
91 /// transactions.
92 ///
93 /// You should not call this method most of the times and use `Database::transact` which
94 /// implements a retry loop strategy for you.
95 pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
96 FdbFuture::<()>::new(unsafe {
97 fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
98 })
99 .map_ok(|()| self.tr)
100 }
101
102 /// Reset the transaction to its initial state.
103 ///
104 /// This is similar to dropping the transaction and creating a new one.
105 pub fn reset(mut self) -> Transaction {
106 self.tr.reset();
107 self.tr
108 }
109}
110
111impl Deref for TransactionCommitError {
112 type Target = FdbError;
113 fn deref(&self) -> &FdbError {
114 &self.err
115 }
116}
117
118impl From<TransactionCommitError> for FdbError {
119 fn from(tce: TransactionCommitError) -> FdbError {
120 tce.err
121 }
122}
123
124impl fmt::Debug for TransactionCommitError {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 write!(f, "TransactionCommitError({})", self.err)
127 }
128}
129
130impl fmt::Display for TransactionCommitError {
131 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132 self.err.fmt(f)
133 }
134}
135
136/// The result of `Transaction::Commit`
137type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
138
139/// A cancelled transaction
140#[derive(Debug)]
141#[repr(transparent)]
142pub struct TransactionCancelled {
143 tr: Transaction,
144}
145impl TransactionCancelled {
146 /// Reset the transaction to its initial state.
147 ///
148 /// This is similar to dropping the transaction and creating a new one.
149 pub fn reset(mut self) -> Transaction {
150 self.tr.reset();
151 self.tr
152 }
153}
154impl From<TransactionCancelled> for Transaction {
155 fn from(tc: TransactionCancelled) -> Transaction {
156 tc.reset()
157 }
158}
159
160/// In FoundationDB, a transaction is a mutable snapshot of a database.
161///
162/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
163///
164/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
165///
166/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
167///
168/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
169#[derive(Debug)]
170pub struct Transaction {
171 // Order of fields should not be changed, because Rust drops field top-to-bottom, and
172 // transaction should be dropped before cluster.
173 inner: NonNull<fdb_sys::FDBTransaction>,
174}
175unsafe impl Send for Transaction {}
176unsafe impl Sync for Transaction {}
177
178/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
179#[inline]
180fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
181 if v {
182 1
183 } else {
184 0
185 }
186}
187#[inline]
188fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
189 assert!(len <= i32::MAX as usize, "{}.len() > i32::MAX", context);
190 len as i32
191}
192#[inline]
193fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
194 if iteration > i32::MAX as usize {
195 0 // this will cause client_invalid_operation
196 } else {
197 iteration as i32
198 }
199}
200#[inline]
201fn fdb_limit(v: usize) -> std::os::raw::c_int {
202 if v > i32::MAX as usize {
203 i32::MAX
204 } else {
205 v as i32
206 }
207}
208
209/// `RangeOption` represents a query parameters for range scan query.
210///
211/// You can construct `RangeOption` easily:
212///
213/// ```
214/// use foundationdb::RangeOption;
215///
216/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
217/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
218/// let opt = RangeOption {
219/// limit: Some(10),
220/// ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
221/// };
222/// ```
223#[derive(Debug, Clone)]
224pub struct RangeOption<'a> {
225 /// The beginning of the range.
226 pub begin: KeySelector<'a>,
227 /// The end of the range.
228 pub end: KeySelector<'a>,
229 /// If non-zero, indicates the maximum number of key-value pairs to return.
230 pub limit: Option<usize>,
231 /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
232 /// return for each item.
233 pub target_bytes: usize,
234 /// One of the options::StreamingMode values indicating how the caller would like the data in
235 /// the range returned.
236 pub mode: options::StreamingMode,
237 /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
238 /// the end of the range.
239 pub reverse: bool,
240 #[doc(hidden)]
241 pub __non_exhaustive: std::marker::PhantomData<()>,
242}
243
244impl RangeOption<'_> {
245 /// Reverses the range direction.
246 pub fn rev(mut self) -> Self {
247 self.reverse = !self.reverse;
248 self
249 }
250
251 pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
252 if !kvs.more() {
253 return None;
254 }
255
256 let last = kvs.last()?;
257 let last_key = last.key();
258
259 if let Some(limit) = self.limit.as_mut() {
260 *limit = limit.saturating_sub(kvs.len());
261 if *limit == 0 {
262 return None;
263 }
264 }
265
266 if self.reverse {
267 self.end.make_first_greater_or_equal(last_key);
268 } else {
269 self.begin.make_first_greater_than(last_key);
270 }
271 Some(self)
272 }
273
274 #[cfg_api_versions(min = 710)]
275 pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
276 if !kvs.more() {
277 return None;
278 }
279
280 let last = kvs.last()?;
281 let last_key = last.parent_key();
282
283 if let Some(limit) = self.limit.as_mut() {
284 *limit = limit.saturating_sub(kvs.len());
285 if *limit == 0 {
286 return None;
287 }
288 }
289
290 if self.reverse {
291 self.end.make_first_greater_or_equal(last_key);
292 } else {
293 self.begin.make_first_greater_than(last_key);
294 }
295 Some(self)
296 }
297}
298
299impl Default for RangeOption<'_> {
300 fn default() -> Self {
301 Self {
302 begin: KeySelector::first_greater_or_equal([].as_ref()),
303 end: KeySelector::first_greater_or_equal([].as_ref()),
304 limit: None,
305 target_bytes: 0,
306 mode: options::StreamingMode::Iterator,
307 reverse: false,
308 __non_exhaustive: std::marker::PhantomData,
309 }
310 }
311}
312
313impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
314 fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
315 Self {
316 begin,
317 end,
318 ..Self::default()
319 }
320 }
321}
322impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
323 fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
324 Self {
325 begin: KeySelector::first_greater_or_equal(begin),
326 end: KeySelector::first_greater_or_equal(end),
327 ..Self::default()
328 }
329 }
330}
331impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
332 fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
333 Self {
334 begin: KeySelector::first_greater_or_equal(begin),
335 end: KeySelector::first_greater_or_equal(end),
336 ..Self::default()
337 }
338 }
339}
340impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
341 fn from(range: Range<KeySelector<'a>>) -> Self {
342 RangeOption::from((range.start, range.end))
343 }
344}
345
346impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
347 fn from(range: Range<&'a [u8]>) -> Self {
348 RangeOption::from((range.start, range.end))
349 }
350}
351
352impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
353 fn from(range: Range<Vec<u8>>) -> Self {
354 RangeOption::from((range.start, range.end))
355 }
356}
357
358impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
359 fn from(range: RangeInclusive<&'a [u8]>) -> Self {
360 let (start, end) = range.into_inner();
361 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
362 }
363}
364
365impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
366 fn from(range: RangeInclusive<Vec<u8>>) -> Self {
367 let (start, end) = range.into_inner();
368 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
369 }
370}
371
372impl Transaction {
373 pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
374 Self { inner }
375 }
376
377 /// Called to set an option on an FDBTransaction.
378 pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
379 unsafe { opt.apply(self.inner.as_ptr()) }
380 }
381
382 /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
383 /// where the code and data will be provided as raw, in order to avoid deserializing to an option
384 /// and serializing it back to code and data.
385 /// In general, you should use `set_option`.
386 pub fn set_raw_option(
387 &self,
388 code: fdb_sys::FDBTransactionOption,
389 data: Option<Vec<u8>>,
390 ) -> FdbResult<()> {
391 let (data_ptr, size) = data
392 .as_ref()
393 .map(|data| {
394 (
395 data.as_ptr(),
396 i32::try_from(data.len()).expect("len to fit in i32"),
397 )
398 })
399 .unwrap_or_else(|| (std::ptr::null(), 0));
400 let err = unsafe {
401 fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
402 };
403 if err != 0 {
404 Err(FdbError::from_code(err))
405 } else {
406 Ok(())
407 }
408 }
409
410 /// Modify the database snapshot represented by transaction to change the given
411 /// key to have the given value.
412 ///
413 /// If the given key was not previously present in the database it is inserted.
414 /// The modification affects the actual database only if transaction is later
415 /// committed with `Transaction::commit`.
416 ///
417 /// # Arguments
418 ///
419 /// * `key` - the name of the key to be inserted into the database.
420 /// * `value` - the value to be inserted into the database
421 pub fn set(&self, key: &[u8], value: &[u8]) {
422 unsafe {
423 fdb_sys::fdb_transaction_set(
424 self.inner.as_ptr(),
425 key.as_ptr(),
426 fdb_len(key.len(), "key"),
427 value.as_ptr(),
428 fdb_len(value.len(), "value"),
429 )
430 }
431 }
432
433 /// Modify the database snapshot represented by transaction to remove the given key from the
434 /// database.
435 ///
436 /// If the key was not previously present in the database, there is no effect. The modification
437 /// affects the actual database only if transaction is later committed with
438 /// `Transaction::commit`.
439 ///
440 /// # Arguments
441 ///
442 /// * `key` - the name of the key to be removed from the database.
443 pub fn clear(&self, key: &[u8]) {
444 unsafe {
445 fdb_sys::fdb_transaction_clear(
446 self.inner.as_ptr(),
447 key.as_ptr(),
448 fdb_len(key.len(), "key"),
449 )
450 }
451 }
452
453 /// Reads a value from the database snapshot represented by transaction.
454 ///
455 /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
456 ///
457 /// # Arguments
458 ///
459 /// * `key` - the name of the key to be looked up in the database
460 /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
461 pub fn get(
462 &self,
463 key: &[u8],
464 snapshot: bool,
465 ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
466 FdbFuture::new(unsafe {
467 fdb_sys::fdb_transaction_get(
468 self.inner.as_ptr(),
469 key.as_ptr(),
470 fdb_len(key.len(), "key"),
471 fdb_bool(snapshot),
472 )
473 })
474 }
475
476 /// Modify the database snapshot represented by transaction to perform the operation indicated
477 /// by operationType with operand param to the value stored by the given key.
478 ///
479 /// An atomic operation is a single database command that carries out several logical steps:
480 /// reading the value of a key, performing a transformation on that value, and writing the
481 /// result. Different atomic operations perform different transformations. Like other database
482 /// operations, an atomic operation is used within a transaction; however, its use within a
483 /// transaction will not cause the transaction to conflict.
484 ///
485 /// Atomic operations do not expose the current value of the key to the client but simply send
486 /// the database the transformation to apply. In regard to conflict checking, an atomic
487 /// operation is equivalent to a write without a read. It can only cause other transactions
488 /// performing reads of the key to conflict.
489 ///
490 /// By combining these logical steps into a single, read-free operation, FoundationDB can
491 /// guarantee that the transaction will not conflict due to the operation. This makes atomic
492 /// operations ideal for operating on keys that are frequently modified. A common example is
493 /// the use of a key-value pair as a counter.
494 ///
495 /// # Warning
496 ///
497 /// If a transaction uses both an atomic operation and a strictly serializable read on the same
498 /// key, the benefits of using the atomic operation (for both conflict checking and performance)
499 /// are lost.
500 pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
501 unsafe {
502 fdb_sys::fdb_transaction_atomic_op(
503 self.inner.as_ptr(),
504 key.as_ptr(),
505 fdb_len(key.len(), "key"),
506 param.as_ptr(),
507 fdb_len(param.len(), "param"),
508 op_type.code(),
509 )
510 }
511 }
512
513 /// Resolves a key selector against the keys in the database snapshot represented by
514 /// transaction.
515 ///
516 /// Returns an FDBFuture which will be set to the key in the database matching the key
517 /// selector.
518 ///
519 /// # Arguments
520 ///
521 /// * `selector`: the key selector
522 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
523 pub fn get_key(
524 &self,
525 selector: &KeySelector,
526 snapshot: bool,
527 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
528 let key = selector.key();
529 FdbFuture::new(unsafe {
530 fdb_sys::fdb_transaction_get_key(
531 self.inner.as_ptr(),
532 key.as_ptr(),
533 fdb_len(key.len(), "key"),
534 fdb_bool(selector.or_equal()),
535 selector.offset(),
536 fdb_bool(snapshot),
537 )
538 })
539 }
540
541 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
542 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
543 /// equal to the key resolved by the begin key selector and lexicographically less than the key
544 /// resolved by the end key selector.
545 ///
546 /// Returns a stream of KeyValue slices.
547 ///
548 /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
549 /// use.
550 ///
551 /// # Arguments
552 ///
553 /// * `opt`: the range, limit, target_bytes and mode
554 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
555 pub fn get_ranges<'a>(
556 &'a self,
557 opt: RangeOption<'a>,
558 snapshot: bool,
559 ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
560 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
561 if let Some(opt) = maybe_opt {
562 Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
563 move |maybe_values| {
564 let next_opt = match &maybe_values {
565 Ok(values) => opt.next_range(values),
566 Err(..) => None,
567 };
568 Some((maybe_values, (iteration + 1, next_opt)))
569 },
570 ))
571 } else {
572 Either::Right(future::ready(None))
573 }
574 })
575 }
576
577 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
578 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
579 /// equal to the key resolved by the begin key selector and lexicographically less than the key
580 /// resolved by the end key selector.
581 ///
582 /// Returns a stream of KeyValue.
583 ///
584 /// # Arguments
585 ///
586 /// * `opt`: the range, limit, target_bytes and mode
587 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
588 pub fn get_ranges_keyvalues<'a>(
589 &'a self,
590 opt: RangeOption<'a>,
591 snapshot: bool,
592 ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
593 self.get_ranges(opt, snapshot)
594 .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
595 .try_flatten()
596 }
597
598 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
599 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
600 /// equal to the key resolved by the begin key selector and lexicographically less than the key
601 /// resolved by the end key selector.
602 ///
603 /// # Arguments
604 ///
605 /// * `opt`: the range, limit, target_bytes and mode
606 /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
607 /// by 1 for each successive call while reading this range. In all other cases it is ignored.
608 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
609 pub fn get_range(
610 &self,
611 opt: &RangeOption,
612 iteration: usize,
613 snapshot: bool,
614 ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
615 let begin = &opt.begin;
616 let end = &opt.end;
617 let key_begin = begin.key();
618 let key_end = end.key();
619
620 FdbFuture::new(unsafe {
621 fdb_sys::fdb_transaction_get_range(
622 self.inner.as_ptr(),
623 key_begin.as_ptr(),
624 fdb_len(key_begin.len(), "key_begin"),
625 fdb_bool(begin.or_equal()),
626 begin.offset(),
627 key_end.as_ptr(),
628 fdb_len(key_end.len(), "key_end"),
629 fdb_bool(end.or_equal()),
630 end.offset(),
631 fdb_limit(opt.limit.unwrap_or(0)),
632 fdb_limit(opt.target_bytes),
633 opt.mode.code(),
634 fdb_iteration(iteration),
635 fdb_bool(snapshot),
636 fdb_bool(opt.reverse),
637 )
638 })
639 }
640
641 /// Mapped Range is an experimental feature introduced in FDB 7.1.
642 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
643 /// In such a case, querying records by scanning an index in relational databases can be
644 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
645 ///
646 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
647 /// this can happen in one request without additional back and forth. Considering the overhead
648 /// of each request, this saves time and resources on serialization, deserialization, and network.
649 ///
650 /// A mapped request will:
651 ///
652 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
653 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
654 /// * Put all results in a nested structure and return them.
655 ///
656 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
657 /// using snapshot isolation AND disabling read-your-writes.
658 ///
659 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
660 ///
661 /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
662 #[cfg_api_versions(min = 710)]
663 pub fn get_mapped_range(
664 &self,
665 opt: &RangeOption,
666 mapper: &[u8],
667 iteration: usize,
668 snapshot: bool,
669 ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
670 let begin = &opt.begin;
671 let end = &opt.end;
672 let key_begin = begin.key();
673 let key_end = end.key();
674
675 FdbFuture::new(unsafe {
676 fdb_sys::fdb_transaction_get_mapped_range(
677 self.inner.as_ptr(),
678 key_begin.as_ptr(),
679 fdb_len(key_begin.len(), "key_begin"),
680 fdb_bool(begin.or_equal()),
681 begin.offset(),
682 key_end.as_ptr(),
683 fdb_len(key_end.len(), "key_end"),
684 fdb_bool(end.or_equal()),
685 end.offset(),
686 mapper.as_ptr(),
687 fdb_len(mapper.len(), "mapper_length"),
688 fdb_limit(opt.limit.unwrap_or(0)),
689 fdb_limit(opt.target_bytes),
690 opt.mode.code(),
691 fdb_iteration(iteration),
692 fdb_bool(snapshot),
693 fdb_bool(opt.reverse),
694 )
695 })
696 }
697
698 /// Mapped Range is an experimental feature introduced in FDB 7.1.
699 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
700 /// In such a case, querying records by scanning an index in relational databases can be
701 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
702 ///
703 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
704 /// this can happen in one request without additional back and forth. Considering the overhead
705 /// of each request, this saves time and resources on serialization, deserialization, and network.
706 ///
707 /// A mapped request will:
708 ///
709 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
710 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
711 /// * Put all results in a nested structure and return them.
712 ///
713 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
714 /// using snapshot isolation AND disabling read-your-writes.
715 ///
716 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
717 #[cfg_api_versions(min = 710)]
718 pub fn get_mapped_ranges<'a>(
719 &'a self,
720 opt: RangeOption<'a>,
721 mapper: &'a [u8],
722 snapshot: bool,
723 ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
724 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
725 if let Some(opt) = maybe_opt {
726 Either::Left(
727 self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
728 .map(move |maybe_values| {
729 let next_opt = match &maybe_values {
730 Ok(values) => opt.next_mapped_range(values),
731 Err(..) => None,
732 };
733 Some((maybe_values, (iteration + 1, next_opt)))
734 }),
735 )
736 } else {
737 Either::Right(future::ready(None))
738 }
739 })
740 }
741
742 /// Modify the database snapshot represented by transaction to remove all keys (if any) which
743 /// are lexicographically greater than or equal to the given begin key and lexicographically
744 /// less than the given end_key.
745 ///
746 /// The modification affects the actual database only if transaction is later committed with
747 /// `Transaction::commit`.
748 pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
749 unsafe {
750 fdb_sys::fdb_transaction_clear_range(
751 self.inner.as_ptr(),
752 begin.as_ptr(),
753 fdb_len(begin.len(), "begin"),
754 end.as_ptr(),
755 fdb_len(end.len(), "end"),
756 )
757 }
758 }
759
760 /// Get the estimated byte size of the key range based on the byte sample collected by FDB
761 #[cfg_api_versions(min = 630)]
762 pub fn get_estimated_range_size_bytes(
763 &self,
764 begin: &[u8],
765 end: &[u8],
766 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
767 FdbFuture::<i64>::new(unsafe {
768 fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
769 self.inner.as_ptr(),
770 begin.as_ptr(),
771 fdb_len(begin.len(), "begin"),
772 end.as_ptr(),
773 fdb_len(end.len(), "end"),
774 )
775 })
776 }
777
778 /// Attempts to commit the sets and clears previously applied to the database snapshot
779 /// represented by transaction to the actual database.
780 ///
781 /// The commit may or may not succeed – in particular, if a conflicting transaction previously
782 /// committed, then the commit must fail in order to preserve transactional isolation. If the
783 /// commit does succeed, the transaction is durably committed to the database and all
784 /// subsequently started transactions will observe its effects.
785 ///
786 /// It is not necessary to commit a read-only transaction – you can simply drop it.
787 ///
788 /// Callers will usually want to retry a transaction if the commit or a another method on the
789 /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
790 ///
791 /// As with other client/server databases, in some failure scenarios a client may be unable to
792 /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
793 /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
794 /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
795 /// could execute the transaction twice. In these cases, you must consider the idempotence of
796 /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
797 ///
798 /// Normally, commit will wait for outstanding reads to return. However, if those reads were
799 /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
800 /// any outstanding reads will immediately return errors.
801 pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
802 FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
803 move |r| match r {
804 Ok(()) => Ok(TransactionCommitted { tr: self }),
805 Err(err) => Err(TransactionCommitError { tr: self, err }),
806 },
807 )
808 }
809
810 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
811 /// which of the error codes generated by other `Transaction` functions represent temporary
812 /// error conditions and which represent application errors that should be handled by the
813 /// application. It also implements an exponential backoff strategy to avoid swamping the
814 /// database cluster with excessive retries when there is a high level of conflict between
815 /// transactions.
816 ///
817 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
818 /// transaction has already been reset.
819 ///
820 /// You should not call this method most of the times and use `Database::transact` which
821 /// implements a retry loop strategy for you.
822 pub fn on_error(
823 self,
824 err: FdbError,
825 ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
826 FdbFuture::<()>::new(unsafe {
827 fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
828 })
829 .map_ok(|()| self)
830 }
831
832 /// Cancels the transaction. All pending or future uses of the transaction will return a
833 /// transaction_cancelled error. The transaction can be used again after it is reset.
834 pub fn cancel(self) -> TransactionCancelled {
835 unsafe { fdb_sys::fdb_transaction_cancel(self.inner.as_ptr()) };
836 TransactionCancelled { tr: self }
837 }
838
839 /// Returns a list of public network addresses as strings, one for each of the storage servers
840 /// responsible for storing key_name and its associated value.
841 pub fn get_addresses_for_key(
842 &self,
843 key: &[u8],
844 ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
845 FdbFuture::new(unsafe {
846 fdb_sys::fdb_transaction_get_addresses_for_key(
847 self.inner.as_ptr(),
848 key.as_ptr(),
849 fdb_len(key.len(), "key"),
850 )
851 })
852 }
853
854 /// A watch's behavior is relative to the transaction that created it. A watch will report a
855 /// change in relation to the key’s value as readable by that transaction. The initial value
856 /// used for comparison is either that of the transaction’s read version or the value as
857 /// modified by the transaction itself prior to the creation of the watch. If the value changes
858 /// and then changes back to its initial value, the watch might not report the change.
859 ///
860 /// Until the transaction that created it has been committed, a watch will not report changes
861 /// made by other transactions. In contrast, a watch will immediately report changes made by
862 /// the transaction itself. Watches cannot be created if the transaction has set the
863 /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
864 /// watches_disabled error.
865 ///
866 /// If the transaction used to create a watch encounters an error during commit, then the watch
867 /// will be set with that error. A transaction whose commit result is unknown will set all of
868 /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
869 /// destroyed, then any watches it created will be set with the transaction_cancelled error.
870 ///
871 /// Returns an future representing an empty value that will be set once the watch has
872 /// detected a change to the value at the specified key.
873 ///
874 /// By default, each database connection can have no more than 10,000 watches that have not yet
875 /// reported a change. When this number is exceeded, an attempt to create a watch will return a
876 /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
877 /// Because a watch outlives the transaction that creates it, any watch that is no longer
878 /// needed should be cancelled by dropping its future.
879 pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
880 FdbFuture::new(unsafe {
881 fdb_sys::fdb_transaction_watch(
882 self.inner.as_ptr(),
883 key.as_ptr(),
884 fdb_len(key.len(), "key"),
885 )
886 })
887 }
888
889 /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
890 /// returned future, which is the summation of the estimated size of mutations, read conflict
891 /// ranges, and write conflict ranges.
892 ///
893 /// This can be called multiple times before the transaction is committed.
894 #[cfg_api_versions(min = 620)]
895 pub fn get_approximate_size(
896 &self,
897 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
898 FdbFuture::new(unsafe {
899 fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
900 })
901 }
902
903 /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
904 /// Note: the returned split points contain the start key and end key of the given range.
905 #[cfg_api_versions(min = 700)]
906 pub fn get_range_split_points(
907 &self,
908 begin: &[u8],
909 end: &[u8],
910 chunk_size: i64,
911 ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
912 FdbFuture::<FdbKeys>::new(unsafe {
913 fdb_sys::fdb_transaction_get_range_split_points(
914 self.inner.as_ptr(),
915 begin.as_ptr(),
916 fdb_len(begin.len(), "begin"),
917 end.as_ptr(),
918 fdb_len(end.len(), "end"),
919 chunk_size,
920 )
921 })
922 }
923
924 /// Returns an FDBFuture which will be set to the versionstamp which was used by any
925 /// versionstamp operations in this transaction.
926 ///
927 /// The future will be ready only after the successful completion of a call to `commit()` on
928 /// this Transaction. Read-only transactions do not modify the database when committed and will
929 /// result in the future completing with an error. Keep in mind that a transaction which reads
930 /// keys and then sets them to their current values may be optimized to a read-only transaction.
931 ///
932 /// Most applications will not call this function.
933 pub fn get_versionstamp(
934 &self,
935 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
936 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
937 }
938
939 /// The transaction obtains a snapshot read version automatically at the time of the first call
940 /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
941 /// compromised by transaction options) is guaranteed to represent all transactions which were
942 /// reported committed before that call.
943 pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
944 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
945 }
946
947 /// Sets the snapshot read version used by a transaction.
948 ///
949 /// This is not needed in simple cases.
950 /// If the given version is too old, subsequent reads will fail with error_code_past_version;
951 /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
952 /// error_code_future_version. If any of get_*() have been called on this transaction already,
953 /// the result is undefined.
954 pub fn set_read_version(&self, version: i64) {
955 unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
956 }
957
958 /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
959 /// The value of this key is sent to clients along with the read version from the proxy,
960 /// so a client can read its value without communicating with a storage server.
961 /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
962 #[cfg_api_versions(min = 610)]
963 pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
964 match self.get(METADATA_VERSION_KEY, snapshot).await {
965 Ok(Some(fdb_slice)) => {
966 let value = fdb_slice.deref();
967 // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
968 // can we assume that it will always be the correct size?
969 if value.len() < 8 {
970 return Ok(None);
971 }
972
973 // The 80-bits versionstamps are 10 bytes longs, and are composed of:
974 // * 8 bytes (Transaction Version)
975 // * followed by 2 bytes (Transaction Batch Order)
976 // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
977 let mut arr = [0u8; 8];
978 arr.copy_from_slice(&value[0..8]);
979 let transaction_version: i64 = i64::from_be_bytes(arr);
980
981 Ok(Some(transaction_version))
982 }
983 Ok(None) => Ok(None),
984
985 Err(err) => Err(err),
986 }
987 }
988
989 #[cfg_api_versions(min = 610)]
990 pub fn update_metadata_version(&self) {
991 // The param is transformed by removing the final four bytes from ``param`` and reading
992 // those as a little-Endian 32-bit integer to get a position ``pos``.
993 // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
994 // versionstamp of the transaction used. The first byte of the parameter is position 0.
995 // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
996 let param = vec![0u8; 14];
997 self.atomic_op(
998 METADATA_VERSION_KEY,
999 param.as_slice(),
1000 options::MutationType::SetVersionstampedValue,
1001 )
1002 }
1003
1004 /// Reset transaction to its initial state.
1005 ///
1006 /// In order to protect against a race condition with cancel(), this call require a mutable
1007 /// access to the transaction.
1008 ///
1009 /// This is similar to dropping the transaction and creating a new one.
1010 ///
1011 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1012 /// transaction has already been reset.
1013 pub fn reset(&mut self) {
1014 unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1015 }
1016
1017 /// Adds a conflict range to a transaction without performing the associated read or write.
1018 ///
1019 /// # Note
1020 ///
1021 /// Most applications will use the serializable isolation that transactions provide by default
1022 /// and will not need to manipulate conflict ranges.
1023 pub fn add_conflict_range(
1024 &self,
1025 begin: &[u8],
1026 end: &[u8],
1027 ty: options::ConflictRangeType,
1028 ) -> FdbResult<()> {
1029 error::eval(unsafe {
1030 fdb_sys::fdb_transaction_add_conflict_range(
1031 self.inner.as_ptr(),
1032 begin.as_ptr(),
1033 fdb_len(begin.len(), "begin"),
1034 end.as_ptr(),
1035 fdb_len(end.len(), "end"),
1036 ty.code(),
1037 )
1038 })
1039 }
1040}
1041
1042impl Drop for Transaction {
1043 fn drop(&mut self) {
1044 unsafe {
1045 fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1046 }
1047 }
1048}
1049
1050/// A retryable transaction, generated by Database.run
1051#[derive(Clone)]
1052pub struct RetryableTransaction {
1053 inner: Arc<Transaction>,
1054}
1055
1056impl Deref for RetryableTransaction {
1057 type Target = Transaction;
1058 fn deref(&self) -> &Transaction {
1059 self.inner.deref()
1060 }
1061}
1062
1063impl RetryableTransaction {
1064 pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1065 RetryableTransaction { inner: Arc::new(t) }
1066 }
1067
1068 pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1069 // checking weak references
1070 if Arc::weak_count(&self.inner) != 0 {
1071 return Err(FdbBindingError::ReferenceToTransactionKept);
1072 }
1073 Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1074 }
1075
1076 pub(crate) async fn on_error(
1077 self,
1078 err: FdbError,
1079 ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1080 Ok(self
1081 .take()?
1082 .on_error(err)
1083 .await
1084 .map(RetryableTransaction::new))
1085 }
1086
1087 pub(crate) async fn commit(
1088 self,
1089 ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1090 Ok(self.take()?.commit().await)
1091 }
1092}