Spaces:
Running
Running
| """Parser for Prometheus text format metrics.""" | |
| import re | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass | |
| class MetricSample: | |
| """A single metric sample with labels and value.""" | |
| name: str | |
| labels: Dict[str, str] | |
| value: float | |
| timestamp: Optional[float] = None | |
| def parse_prometheus_metrics(text: str) -> Dict[str, List[MetricSample]]: | |
| """ | |
| Parse Prometheus text format into structured metrics. | |
| Args: | |
| text: Raw Prometheus metrics text | |
| Returns: | |
| Dictionary mapping metric names to lists of samples | |
| """ | |
| metrics: Dict[str, List[MetricSample]] = {} | |
| for line in text.strip().split("\n"): | |
| line = line.strip() | |
| # Skip empty lines and comments | |
| if not line or line.startswith("#"): | |
| continue | |
| # Parse metric line | |
| sample = _parse_metric_line(line) | |
| if sample: | |
| if sample.name not in metrics: | |
| metrics[sample.name] = [] | |
| metrics[sample.name].append(sample) | |
| return metrics | |
| def _parse_metric_line(line: str) -> Optional[MetricSample]: | |
| """Parse a single Prometheus metric line.""" | |
| # Pattern: metric_name{label1="value1",label2="value2"} value [timestamp] | |
| # Or: metric_name value [timestamp] | |
| # Match with labels | |
| match = re.match( | |
| r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([^\s]+)(?:\s+(\d+))?$', | |
| line | |
| ) | |
| if match: | |
| name = match.group(1) | |
| labels_str = match.group(2) | |
| value_str = match.group(3) | |
| timestamp_str = match.group(4) | |
| labels = _parse_labels(labels_str) | |
| value = _parse_value(value_str) | |
| timestamp = float(timestamp_str) if timestamp_str else None | |
| return MetricSample(name=name, labels=labels, value=value, timestamp=timestamp) | |
| # Match without labels | |
| match = re.match( | |
| r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([^\s]+)(?:\s+(\d+))?$', | |
| line | |
| ) | |
| if match: | |
| name = match.group(1) | |
| value_str = match.group(2) | |
| timestamp_str = match.group(3) | |
| value = _parse_value(value_str) | |
| timestamp = float(timestamp_str) if timestamp_str else None | |
| return MetricSample(name=name, labels={}, value=value, timestamp=timestamp) | |
| return None | |
| def _parse_labels(labels_str: str) -> Dict[str, str]: | |
| """Parse label string into dictionary.""" | |
| labels = {} | |
| # Pattern: key="value" | |
| for match in re.finditer(r'([a-zA-Z_][a-zA-Z0-9_]*)="([^"]*)"', labels_str): | |
| labels[match.group(1)] = match.group(2) | |
| return labels | |
| def _parse_value(value_str: str) -> float: | |
| """Parse metric value, handling special cases.""" | |
| if value_str.lower() == "nan": | |
| return float("nan") | |
| if value_str.lower() == "+inf": | |
| return float("inf") | |
| if value_str.lower() == "-inf": | |
| return float("-inf") | |
| return float(value_str) | |
| def get_metric_value( | |
| metrics: Dict[str, List[MetricSample]], | |
| name: str, | |
| labels: Optional[Dict[str, str]] = None | |
| ) -> Optional[float]: | |
| """ | |
| Get a specific metric value by name and optional labels. | |
| Args: | |
| metrics: Parsed metrics dictionary | |
| name: Metric name | |
| labels: Optional label filter | |
| Returns: | |
| Metric value or None if not found | |
| """ | |
| if name not in metrics: | |
| return None | |
| for sample in metrics[name]: | |
| if labels is None: | |
| return sample.value | |
| # Check if all specified labels match | |
| if all(sample.labels.get(k) == v for k, v in labels.items()): | |
| return sample.value | |
| return None | |
| def get_histogram_quantile( | |
| metrics: Dict[str, List[MetricSample]], | |
| name: str, | |
| quantile: float, | |
| labels: Optional[Dict[str, str]] = None | |
| ) -> Optional[float]: | |
| """ | |
| Get histogram quantile value from Prometheus histogram. | |
| Args: | |
| metrics: Parsed metrics dictionary | |
| name: Base metric name (without _bucket suffix) | |
| quantile: Desired quantile (e.g., 0.95 for P95) | |
| labels: Optional label filter | |
| Returns: | |
| Approximate quantile value or None | |
| """ | |
| bucket_name = f"{name}_bucket" | |
| if bucket_name not in metrics: | |
| return None | |
| # Get all buckets | |
| buckets = [] | |
| for sample in metrics[bucket_name]: | |
| if labels and not all(sample.labels.get(k) == v for k, v in labels.items()): | |
| continue | |
| le = sample.labels.get("le") | |
| if le and le != "+Inf": | |
| buckets.append((float(le), sample.value)) | |
| if not buckets: | |
| return None | |
| # Sort by bucket boundary | |
| buckets.sort(key=lambda x: x[0]) | |
| # Get total count | |
| total = buckets[-1][1] if buckets else 0 | |
| if total == 0: | |
| return None | |
| # Find bucket containing quantile | |
| target = quantile * total | |
| prev_bound = 0 | |
| prev_count = 0 | |
| for bound, count in buckets: | |
| if count >= target: | |
| # Linear interpolation within bucket | |
| fraction = (target - prev_count) / (count - prev_count) if count > prev_count else 0 | |
| return prev_bound + fraction * (bound - prev_bound) | |
| prev_bound = bound | |
| prev_count = count | |
| return buckets[-1][0] if buckets else None | |