1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4pub type Labels = Vec<(String, String)>;
6
7#[derive(Clone, Hash, Eq, PartialEq, Debug)]
9pub struct MetricKey {
10 pub name: String,
11 pub labels: Labels,
12}
13
14impl MetricKey {
15 pub fn new(name: &str, labels: &[(&str, &str)]) -> Self {
24 let mut sorted_labels: Labels = labels
26 .iter()
27 .map(|(k, v)| (k.to_string(), v.to_string()))
28 .collect();
29
30 sorted_labels.sort_by(|a, b| a.0.cmp(&b.0));
32
33 Self {
34 name: name.to_string(),
35 labels: sorted_labels,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Default)]
44pub struct TransactionMetrics {
45 pub metrics: Arc<Mutex<MetricsReport>>,
47}
48
49#[derive(Debug, Default, Clone)]
51pub struct TransactionInfo {
52 pub retries: u64,
54 pub read_version: Option<i64>,
56 pub commit_version: Option<i64>,
58}
59
60#[derive(Debug, Clone, Default)]
63pub struct MetricsReport {
64 pub current: CounterMetrics,
66 pub total: CounterMetrics,
68 pub time: TimingMetrics,
70 pub custom_metrics: HashMap<MetricKey, u64>,
72 pub transaction: TransactionInfo,
74}
75
76impl MetricsReport {
77 pub fn set_read_version(&mut self, version: i64) {
82 self.transaction.read_version = Some(version);
83 }
84
85 pub fn set_commit_version(&mut self, version: i64) {
90 self.transaction.commit_version = Some(version);
91 }
92
93 pub fn increment_retries(&mut self) {
95 self.transaction.retries += 1;
96 }
97}
98
99#[derive(Debug, Default, Clone)]
101pub struct TimingMetrics {
102 pub commit_execution_ms: u64,
104 pub on_error_execution_ms: Vec<u64>,
106 pub total_execution_ms: u64,
108}
109
110impl TimingMetrics {
111 pub fn record_commit_time(&mut self, duration_ms: u64) {
116 self.commit_execution_ms = duration_ms;
117 }
118
119 pub fn add_error_time(&mut self, duration_ms: u64) {
124 self.on_error_execution_ms.push(duration_ms);
125 }
126
127 pub fn set_execution_time(&mut self, duration_ms: u64) {
132 self.total_execution_ms = duration_ms;
133 }
134
135 pub fn get_total_error_time(&self) -> u64 {
140 self.on_error_execution_ms.iter().sum()
141 }
142}
143
144pub enum FdbCommand {
146 Atomic,
148 Clear,
150 ClearRange,
152 Get(u64, u64),
154 GetRange(u64, u64),
155 Set(u64),
156}
157
158const INCREMENT: u64 = 1;
159
160impl TransactionMetrics {
161 pub fn new() -> Self {
163 Self {
164 metrics: Arc::new(Mutex::new(MetricsReport::default())),
165 }
166 }
167
168 pub fn set_read_version(&self, version: i64) {
173 let mut data = self
174 .metrics
175 .lock()
176 .unwrap_or_else(|poisoned| poisoned.into_inner());
177 data.set_read_version(version);
178 }
179
180 pub fn set_commit_version(&self, version: i64) {
185 let mut data = self
186 .metrics
187 .lock()
188 .unwrap_or_else(|poisoned| poisoned.into_inner());
189 data.set_commit_version(version);
190 }
191
192 pub fn reset_current(&self) {
198 let mut data = self
199 .metrics
200 .lock()
201 .unwrap_or_else(|poisoned| poisoned.into_inner());
202 data.increment_retries();
203 data.current = CounterMetrics::default();
204 data.custom_metrics.clear();
205 }
206
207 pub fn increment_retries(&self) {
209 let mut data = self
210 .metrics
211 .lock()
212 .unwrap_or_else(|poisoned| poisoned.into_inner());
213 data.increment_retries();
214 }
215
216 pub fn report_metrics(&self, fdb_command: FdbCommand) {
223 let mut data = self
224 .metrics
225 .lock()
226 .unwrap_or_else(|poisoned| poisoned.into_inner());
227 data.current.increment(&fdb_command);
228 data.total.increment(&fdb_command);
229 }
230
231 pub fn get_retries(&self) -> u64 {
236 let data = self
237 .metrics
238 .lock()
239 .unwrap_or_else(|poisoned| poisoned.into_inner());
240 data.transaction.retries
241 }
242
243 pub fn get_metrics_data(&self) -> MetricsReport {
248 let data = self
249 .metrics
250 .lock()
251 .unwrap_or_else(|poisoned| poisoned.into_inner());
252 data.clone()
253 }
254
255 pub fn get_transaction_info(&self) -> TransactionInfo {
260 let data = self
261 .metrics
262 .lock()
263 .unwrap_or_else(|poisoned| poisoned.into_inner());
264 data.transaction.clone()
265 }
266
267 pub fn set_custom(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
274 let mut data = self
275 .metrics
276 .lock()
277 .unwrap_or_else(|poisoned| poisoned.into_inner());
278 let key = MetricKey::new(name, labels);
279 data.custom_metrics.insert(key.clone(), value);
280 }
281
282 pub fn increment_custom(&self, name: &str, amount: u64, labels: &[(&str, &str)]) {
289 let mut data = self
290 .metrics
291 .lock()
292 .unwrap_or_else(|poisoned| poisoned.into_inner());
293 let key = MetricKey::new(name, labels);
294 *data.custom_metrics.entry(key.clone()).or_insert(0) += amount;
296 }
297
298 pub fn record_commit_time(&self, duration_ms: u64) {
303 let mut data = self
304 .metrics
305 .lock()
306 .unwrap_or_else(|poisoned| poisoned.into_inner());
307 data.time.commit_execution_ms = duration_ms;
308 }
309
310 pub fn add_error_time(&self, duration_ms: u64) {
315 let mut data = self
316 .metrics
317 .lock()
318 .unwrap_or_else(|poisoned| poisoned.into_inner());
319 data.time.on_error_execution_ms.push(duration_ms);
320 }
321
322 pub fn set_execution_time(&self, duration_ms: u64) {
327 let mut data = self
328 .metrics
329 .lock()
330 .unwrap_or_else(|poisoned| poisoned.into_inner());
331 data.time.set_execution_time(duration_ms);
332 }
333
334 pub fn get_total_error_time(&self) -> u64 {
339 let data = self
340 .metrics
341 .lock()
342 .unwrap_or_else(|poisoned| poisoned.into_inner());
343 data.time.on_error_execution_ms.iter().sum()
344 }
345}
346
347#[derive(Debug, Default, Clone)]
354pub struct CounterMetrics {
355 pub call_atomic_op: u64,
358 pub call_clear: u64,
361 pub call_clear_range: u64,
364 pub call_get: u64,
367 pub keys_values_fetched: u64,
369 pub bytes_read: u64,
371 pub call_set: u64,
374 pub bytes_written: u64,
376}
377
378impl CounterMetrics {
379 pub fn increment(&mut self, fdb_command: &FdbCommand) {
386 match fdb_command {
387 FdbCommand::Atomic => self.call_atomic_op += INCREMENT,
388 FdbCommand::Clear => self.call_clear += INCREMENT,
389 FdbCommand::ClearRange => self.call_clear_range += INCREMENT,
390 FdbCommand::Get(bytes_count, kv_fetched) => {
391 self.keys_values_fetched += *kv_fetched;
392 self.bytes_read += *bytes_count;
393 self.call_get += INCREMENT
394 }
395 FdbCommand::GetRange(bytes_count, keys_values_fetched) => {
396 self.keys_values_fetched += *keys_values_fetched;
397 self.bytes_read += *bytes_count;
398 }
399 FdbCommand::Set(bytes_count) => {
400 self.bytes_written += *bytes_count;
401 self.call_set += INCREMENT
402 }
403 }
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use std::collections::HashSet;
411
412 #[test]
413 fn test_metric_key_equality() {
414 let key1 = MetricKey::new("counter", &[("region", "us-west"), ("service", "api")]);
416 let key2 = MetricKey::new("counter", &[("service", "api"), ("region", "us-west")]);
417
418 assert_eq!(key1, key2);
420
421 let key3 = MetricKey::new("counter", &[("region", "us-east"), ("service", "api")]);
423 assert_ne!(key1, key3);
424
425 let key4 = MetricKey::new("counter", &[("zone", "us-west"), ("service", "api")]);
427 assert_ne!(key1, key4);
428
429 let key5 = MetricKey::new("timer", &[("region", "us-west"), ("service", "api")]);
431 assert_ne!(key1, key5);
432 }
433
434 #[test]
435 fn test_metric_key_in_hashmap() {
436 let mut metrics = HashMap::new();
437
438 metrics.insert(
440 MetricKey::new("counter", &[("region", "us-west"), ("service", "api")]),
441 100,
442 );
443 metrics.insert(
444 MetricKey::new("counter", &[("region", "us-east"), ("service", "api")]),
445 200,
446 );
447 metrics.insert(
448 MetricKey::new("timer", &[("region", "us-west"), ("service", "api")]),
449 300,
450 );
451
452 assert_eq!(
454 metrics.get(&MetricKey::new(
455 "counter",
456 &[("region", "us-west"), ("service", "api")]
457 )),
458 Some(&100)
459 );
460
461 assert_eq!(
463 metrics.get(&MetricKey::new(
464 "counter",
465 &[("service", "api"), ("region", "us-west")]
466 )),
467 Some(&100)
468 );
469
470 assert_eq!(
472 metrics.get(&MetricKey::new(
473 "counter",
474 &[("region", "us-east"), ("service", "api")]
475 )),
476 Some(&200)
477 );
478
479 assert_eq!(
481 metrics.get(&MetricKey::new(
482 "timer",
483 &[("region", "us-west"), ("service", "api")]
484 )),
485 Some(&300)
486 );
487 }
488
489 #[test]
490 fn test_metric_key_label_order_independence() {
491 let mut unique_keys = HashSet::new();
493
494 unique_keys.insert(MetricKey::new(
496 "counter",
497 &[("a", "1"), ("b", "2"), ("c", "3")],
498 ));
499 unique_keys.insert(MetricKey::new(
500 "counter",
501 &[("a", "1"), ("c", "3"), ("b", "2")],
502 ));
503 unique_keys.insert(MetricKey::new(
504 "counter",
505 &[("b", "2"), ("a", "1"), ("c", "3")],
506 ));
507 unique_keys.insert(MetricKey::new(
508 "counter",
509 &[("b", "2"), ("c", "3"), ("a", "1")],
510 ));
511 unique_keys.insert(MetricKey::new(
512 "counter",
513 &[("c", "3"), ("a", "1"), ("b", "2")],
514 ));
515 unique_keys.insert(MetricKey::new(
516 "counter",
517 &[("c", "3"), ("b", "2"), ("a", "1")],
518 ));
519
520 assert_eq!(unique_keys.len(), 1);
522 }
523
524 #[test]
525 fn test_custom_metrics_operations() {
526 let metrics = TransactionMetrics::new();
528
529 metrics.set_custom(
531 "api_calls",
532 100,
533 &[("endpoint", "users"), ("method", "GET")],
534 );
535
536 let data = metrics.get_metrics_data();
538 let key = MetricKey::new("api_calls", &[("endpoint", "users"), ("method", "GET")]);
539 assert_eq!(data.custom_metrics.get(&key).copied(), Some(100));
540
541 metrics.increment_custom("api_calls", 50, &[("endpoint", "users"), ("method", "GET")]);
543
544 let data = metrics.get_metrics_data();
546 assert_eq!(data.custom_metrics.get(&key).copied(), Some(150));
547
548 metrics.set_custom(
550 "api_calls",
551 200,
552 &[("endpoint", "users"), ("method", "GET")],
553 );
554
555 let data = metrics.get_metrics_data();
557 assert_eq!(data.custom_metrics.get(&key).copied(), Some(200));
558
559 metrics.increment_custom("api_calls", 75, &[("method", "GET"), ("endpoint", "users")]);
561
562 let data = metrics.get_metrics_data();
564 assert_eq!(data.custom_metrics.get(&key).copied(), Some(275));
565 }
566}