1use crate::transaction::ConflictingKeyRange;
2use std::collections::HashMap;
3use std::sync::{Arc, Mutex};
4
5pub type Labels = Vec<(String, String)>;
7
8#[derive(Clone, Hash, Eq, PartialEq, Debug)]
10pub struct MetricKey {
11 pub name: String,
12 pub labels: Labels,
13}
14
15impl MetricKey {
16 pub fn new(name: &str, labels: &[(&str, &str)]) -> Self {
25 let mut sorted_labels: Labels = labels
27 .iter()
28 .map(|(k, v)| (k.to_string(), v.to_string()))
29 .collect();
30
31 sorted_labels.sort_by(|a, b| a.0.cmp(&b.0));
33
34 Self {
35 name: name.to_string(),
36 labels: sorted_labels,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Default)]
45pub struct TransactionMetrics {
46 pub metrics: Arc<Mutex<MetricsReport>>,
48}
49
50#[derive(Debug, Default, Clone)]
52pub struct TransactionInfo {
53 pub retries: u64,
55 pub conflict_count: u64,
57 pub read_version: Option<i64>,
59 pub commit_version: Option<i64>,
61}
62
63#[derive(Debug, Clone, Default)]
66pub struct MetricsReport {
67 pub current: CounterMetrics,
69 pub total: CounterMetrics,
71 pub time: TimingMetrics,
73 pub custom_metrics: HashMap<MetricKey, u64>,
75 pub transaction: TransactionInfo,
77 pub conflicting_keys: Vec<ConflictingKeyRange>,
80}
81
82impl MetricsReport {
83 pub fn set_read_version(&mut self, version: i64) {
88 self.transaction.read_version = Some(version);
89 }
90
91 pub fn set_commit_version(&mut self, version: i64) {
96 self.transaction.commit_version = Some(version);
97 }
98
99 pub fn increment_retries(&mut self) {
101 self.transaction.retries += 1;
102 }
103}
104
105#[derive(Debug, Default, Clone)]
107pub struct TimingMetrics {
108 pub commit_execution_ms: u64,
110 pub on_error_execution_ms: Vec<u64>,
112 pub total_execution_ms: u64,
114}
115
116impl TimingMetrics {
117 pub fn record_commit_time(&mut self, duration_ms: u64) {
122 self.commit_execution_ms = duration_ms;
123 }
124
125 pub fn add_error_time(&mut self, duration_ms: u64) {
130 self.on_error_execution_ms.push(duration_ms);
131 }
132
133 pub fn set_execution_time(&mut self, duration_ms: u64) {
138 self.total_execution_ms = duration_ms;
139 }
140
141 pub fn get_total_error_time(&self) -> u64 {
146 self.on_error_execution_ms.iter().sum()
147 }
148}
149
150pub enum FdbCommand {
152 Atomic,
154 Clear,
156 ClearRange,
158 Get(u64, u64),
160 GetRange(u64, u64),
161 Set(u64),
162}
163
164const INCREMENT: u64 = 1;
165
166impl TransactionMetrics {
167 pub fn new() -> Self {
169 Self {
170 metrics: Arc::new(Mutex::new(MetricsReport::default())),
171 }
172 }
173
174 pub fn set_read_version(&self, version: i64) {
179 let mut data = self
180 .metrics
181 .lock()
182 .unwrap_or_else(|poisoned| poisoned.into_inner());
183 data.set_read_version(version);
184 }
185
186 pub fn set_commit_version(&self, version: i64) {
191 let mut data = self
192 .metrics
193 .lock()
194 .unwrap_or_else(|poisoned| poisoned.into_inner());
195 data.set_commit_version(version);
196 }
197
198 pub fn reset_current(&self) {
204 let mut data = self
205 .metrics
206 .lock()
207 .unwrap_or_else(|poisoned| poisoned.into_inner());
208 data.increment_retries();
209 data.current = CounterMetrics::default();
210 data.custom_metrics.clear();
211 }
212
213 pub fn increment_retries(&self) {
215 let mut data = self
216 .metrics
217 .lock()
218 .unwrap_or_else(|poisoned| poisoned.into_inner());
219 data.increment_retries();
220 }
221
222 pub fn increment_conflict_count(&self) {
224 let mut data = self
225 .metrics
226 .lock()
227 .unwrap_or_else(|poisoned| poisoned.into_inner());
228 data.transaction.conflict_count += 1;
229 }
230
231 pub fn report_metrics(&self, fdb_command: FdbCommand) {
238 let mut data = self
239 .metrics
240 .lock()
241 .unwrap_or_else(|poisoned| poisoned.into_inner());
242 data.current.increment(&fdb_command);
243 data.total.increment(&fdb_command);
244 }
245
246 pub fn get_retries(&self) -> u64 {
251 let data = self
252 .metrics
253 .lock()
254 .unwrap_or_else(|poisoned| poisoned.into_inner());
255 data.transaction.retries
256 }
257
258 pub fn get_metrics_data(&self) -> MetricsReport {
263 let data = self
264 .metrics
265 .lock()
266 .unwrap_or_else(|poisoned| poisoned.into_inner());
267 data.clone()
268 }
269
270 pub fn get_transaction_info(&self) -> TransactionInfo {
275 let data = self
276 .metrics
277 .lock()
278 .unwrap_or_else(|poisoned| poisoned.into_inner());
279 data.transaction.clone()
280 }
281
282 pub fn set_custom(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
289 let mut data = self
290 .metrics
291 .lock()
292 .unwrap_or_else(|poisoned| poisoned.into_inner());
293 let key = MetricKey::new(name, labels);
294 data.custom_metrics.insert(key.clone(), value);
295 }
296
297 pub fn increment_custom(&self, name: &str, amount: u64, labels: &[(&str, &str)]) {
304 let mut data = self
305 .metrics
306 .lock()
307 .unwrap_or_else(|poisoned| poisoned.into_inner());
308 let key = MetricKey::new(name, labels);
309 *data.custom_metrics.entry(key.clone()).or_insert(0) += amount;
311 }
312
313 pub fn set_conflicting_keys(&self, keys: Vec<ConflictingKeyRange>) {
315 let mut data = self
316 .metrics
317 .lock()
318 .unwrap_or_else(|poisoned| poisoned.into_inner());
319 data.conflicting_keys = keys;
320 }
321
322 pub fn record_commit_time(&self, duration_ms: u64) {
327 let mut data = self
328 .metrics
329 .lock()
330 .unwrap_or_else(|poisoned| poisoned.into_inner());
331 data.time.commit_execution_ms = duration_ms;
332 }
333
334 pub fn add_error_time(&self, duration_ms: u64) {
339 let mut data = self
340 .metrics
341 .lock()
342 .unwrap_or_else(|poisoned| poisoned.into_inner());
343 data.time.on_error_execution_ms.push(duration_ms);
344 }
345
346 pub fn set_execution_time(&self, duration_ms: u64) {
351 let mut data = self
352 .metrics
353 .lock()
354 .unwrap_or_else(|poisoned| poisoned.into_inner());
355 data.time.set_execution_time(duration_ms);
356 }
357
358 pub fn get_total_error_time(&self) -> u64 {
363 let data = self
364 .metrics
365 .lock()
366 .unwrap_or_else(|poisoned| poisoned.into_inner());
367 data.time.on_error_execution_ms.iter().sum()
368 }
369}
370
371#[derive(Debug, Default, Clone)]
378pub struct CounterMetrics {
379 pub call_atomic_op: u64,
382 pub call_clear: u64,
385 pub call_clear_range: u64,
388 pub call_get: u64,
391 pub keys_values_fetched: u64,
393 pub bytes_read: u64,
395 pub call_set: u64,
398 pub bytes_written: u64,
400}
401
402impl CounterMetrics {
403 pub fn increment(&mut self, fdb_command: &FdbCommand) {
410 match fdb_command {
411 FdbCommand::Atomic => self.call_atomic_op += INCREMENT,
412 FdbCommand::Clear => self.call_clear += INCREMENT,
413 FdbCommand::ClearRange => self.call_clear_range += INCREMENT,
414 FdbCommand::Get(bytes_count, kv_fetched) => {
415 self.keys_values_fetched += *kv_fetched;
416 self.bytes_read += *bytes_count;
417 self.call_get += INCREMENT
418 }
419 FdbCommand::GetRange(bytes_count, keys_values_fetched) => {
420 self.keys_values_fetched += *keys_values_fetched;
421 self.bytes_read += *bytes_count;
422 }
423 FdbCommand::Set(bytes_count) => {
424 self.bytes_written += *bytes_count;
425 self.call_set += INCREMENT
426 }
427 }
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use std::collections::HashSet;
435
436 #[test]
437 fn test_metric_key_equality() {
438 let key1 = MetricKey::new("counter", &[("region", "us-west"), ("service", "api")]);
440 let key2 = MetricKey::new("counter", &[("service", "api"), ("region", "us-west")]);
441
442 assert_eq!(key1, key2);
444
445 let key3 = MetricKey::new("counter", &[("region", "us-east"), ("service", "api")]);
447 assert_ne!(key1, key3);
448
449 let key4 = MetricKey::new("counter", &[("zone", "us-west"), ("service", "api")]);
451 assert_ne!(key1, key4);
452
453 let key5 = MetricKey::new("timer", &[("region", "us-west"), ("service", "api")]);
455 assert_ne!(key1, key5);
456 }
457
458 #[test]
459 fn test_metric_key_in_hashmap() {
460 let mut metrics = HashMap::new();
461
462 metrics.insert(
464 MetricKey::new("counter", &[("region", "us-west"), ("service", "api")]),
465 100,
466 );
467 metrics.insert(
468 MetricKey::new("counter", &[("region", "us-east"), ("service", "api")]),
469 200,
470 );
471 metrics.insert(
472 MetricKey::new("timer", &[("region", "us-west"), ("service", "api")]),
473 300,
474 );
475
476 assert_eq!(
478 metrics.get(&MetricKey::new(
479 "counter",
480 &[("region", "us-west"), ("service", "api")]
481 )),
482 Some(&100)
483 );
484
485 assert_eq!(
487 metrics.get(&MetricKey::new(
488 "counter",
489 &[("service", "api"), ("region", "us-west")]
490 )),
491 Some(&100)
492 );
493
494 assert_eq!(
496 metrics.get(&MetricKey::new(
497 "counter",
498 &[("region", "us-east"), ("service", "api")]
499 )),
500 Some(&200)
501 );
502
503 assert_eq!(
505 metrics.get(&MetricKey::new(
506 "timer",
507 &[("region", "us-west"), ("service", "api")]
508 )),
509 Some(&300)
510 );
511 }
512
513 #[test]
514 fn test_metric_key_label_order_independence() {
515 let mut unique_keys = HashSet::new();
517
518 unique_keys.insert(MetricKey::new(
520 "counter",
521 &[("a", "1"), ("b", "2"), ("c", "3")],
522 ));
523 unique_keys.insert(MetricKey::new(
524 "counter",
525 &[("a", "1"), ("c", "3"), ("b", "2")],
526 ));
527 unique_keys.insert(MetricKey::new(
528 "counter",
529 &[("b", "2"), ("a", "1"), ("c", "3")],
530 ));
531 unique_keys.insert(MetricKey::new(
532 "counter",
533 &[("b", "2"), ("c", "3"), ("a", "1")],
534 ));
535 unique_keys.insert(MetricKey::new(
536 "counter",
537 &[("c", "3"), ("a", "1"), ("b", "2")],
538 ));
539 unique_keys.insert(MetricKey::new(
540 "counter",
541 &[("c", "3"), ("b", "2"), ("a", "1")],
542 ));
543
544 assert_eq!(unique_keys.len(), 1);
546 }
547
548 #[test]
549 fn test_custom_metrics_operations() {
550 let metrics = TransactionMetrics::new();
552
553 metrics.set_custom(
555 "api_calls",
556 100,
557 &[("endpoint", "users"), ("method", "GET")],
558 );
559
560 let data = metrics.get_metrics_data();
562 let key = MetricKey::new("api_calls", &[("endpoint", "users"), ("method", "GET")]);
563 assert_eq!(data.custom_metrics.get(&key).copied(), Some(100));
564
565 metrics.increment_custom("api_calls", 50, &[("endpoint", "users"), ("method", "GET")]);
567
568 let data = metrics.get_metrics_data();
570 assert_eq!(data.custom_metrics.get(&key).copied(), Some(150));
571
572 metrics.set_custom(
574 "api_calls",
575 200,
576 &[("endpoint", "users"), ("method", "GET")],
577 );
578
579 let data = metrics.get_metrics_data();
581 assert_eq!(data.custom_metrics.get(&key).copied(), Some(200));
582
583 metrics.increment_custom("api_calls", 75, &[("method", "GET"), ("endpoint", "users")]);
585
586 let data = metrics.get_metrics_data();
588 assert_eq!(data.custom_metrics.get(&key).copied(), Some(275));
589 }
590}