"""Parser for Prometheus text format metrics.""" import re from typing import Dict, List, Any, Optional from dataclasses import dataclass @dataclass class MetricSample: """A single metric sample with labels and value.""" name: str labels: Dict[str, str] value: float timestamp: Optional[float] = None def parse_prometheus_metrics(text: str) -> Dict[str, List[MetricSample]]: """ Parse Prometheus text format into structured metrics. Args: text: Raw Prometheus metrics text Returns: Dictionary mapping metric names to lists of samples """ metrics: Dict[str, List[MetricSample]] = {} for line in text.strip().split("\n"): line = line.strip() # Skip empty lines and comments if not line or line.startswith("#"): continue # Parse metric line sample = _parse_metric_line(line) if sample: if sample.name not in metrics: metrics[sample.name] = [] metrics[sample.name].append(sample) return metrics def _parse_metric_line(line: str) -> Optional[MetricSample]: """Parse a single Prometheus metric line.""" # Pattern: metric_name{label1="value1",label2="value2"} value [timestamp] # Or: metric_name value [timestamp] # Match with labels match = re.match( r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([^\s]+)(?:\s+(\d+))?$', line ) if match: name = match.group(1) labels_str = match.group(2) value_str = match.group(3) timestamp_str = match.group(4) labels = _parse_labels(labels_str) value = _parse_value(value_str) timestamp = float(timestamp_str) if timestamp_str else None return MetricSample(name=name, labels=labels, value=value, timestamp=timestamp) # Match without labels match = re.match( r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([^\s]+)(?:\s+(\d+))?$', line ) if match: name = match.group(1) value_str = match.group(2) timestamp_str = match.group(3) value = _parse_value(value_str) timestamp = float(timestamp_str) if timestamp_str else None return MetricSample(name=name, labels={}, value=value, timestamp=timestamp) return None def _parse_labels(labels_str: str) -> Dict[str, str]: """Parse label string into dictionary.""" labels = {} # Pattern: key="value" for match in re.finditer(r'([a-zA-Z_][a-zA-Z0-9_]*)="([^"]*)"', labels_str): labels[match.group(1)] = match.group(2) return labels def _parse_value(value_str: str) -> float: """Parse metric value, handling special cases.""" if value_str.lower() == "nan": return float("nan") if value_str.lower() == "+inf": return float("inf") if value_str.lower() == "-inf": return float("-inf") return float(value_str) def get_metric_value( metrics: Dict[str, List[MetricSample]], name: str, labels: Optional[Dict[str, str]] = None ) -> Optional[float]: """ Get a specific metric value by name and optional labels. Args: metrics: Parsed metrics dictionary name: Metric name labels: Optional label filter Returns: Metric value or None if not found """ if name not in metrics: return None for sample in metrics[name]: if labels is None: return sample.value # Check if all specified labels match if all(sample.labels.get(k) == v for k, v in labels.items()): return sample.value return None def get_histogram_quantile( metrics: Dict[str, List[MetricSample]], name: str, quantile: float, labels: Optional[Dict[str, str]] = None ) -> Optional[float]: """ Get histogram quantile value from Prometheus histogram. Args: metrics: Parsed metrics dictionary name: Base metric name (without _bucket suffix) quantile: Desired quantile (e.g., 0.95 for P95) labels: Optional label filter Returns: Approximate quantile value or None """ bucket_name = f"{name}_bucket" if bucket_name not in metrics: return None # Get all buckets buckets = [] for sample in metrics[bucket_name]: if labels and not all(sample.labels.get(k) == v for k, v in labels.items()): continue le = sample.labels.get("le") if le and le != "+Inf": buckets.append((float(le), sample.value)) if not buckets: return None # Sort by bucket boundary buckets.sort(key=lambda x: x[0]) # Get total count total = buckets[-1][1] if buckets else 0 if total == 0: return None # Find bucket containing quantile target = quantile * total prev_bound = 0 prev_count = 0 for bound, count in buckets: if count >= target: # Linear interpolation within bucket fraction = (target - prev_count) / (count - prev_count) if count > prev_count else 0 return prev_bound + fraction * (bound - prev_bound) prev_bound = bound prev_count = count return buckets[-1][0] if buckets else None