llm-inference-dashboard / utils /prometheus_parser.py
jkottu's picture
Initial commit: LLM Inference Dashboard
aefabf0
"""Parser for Prometheus text format metrics."""
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class MetricSample:
"""A single metric sample with labels and value."""
name: str
labels: Dict[str, str]
value: float
timestamp: Optional[float] = None
def parse_prometheus_metrics(text: str) -> Dict[str, List[MetricSample]]:
"""
Parse Prometheus text format into structured metrics.
Args:
text: Raw Prometheus metrics text
Returns:
Dictionary mapping metric names to lists of samples
"""
metrics: Dict[str, List[MetricSample]] = {}
for line in text.strip().split("\n"):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Parse metric line
sample = _parse_metric_line(line)
if sample:
if sample.name not in metrics:
metrics[sample.name] = []
metrics[sample.name].append(sample)
return metrics
def _parse_metric_line(line: str) -> Optional[MetricSample]:
"""Parse a single Prometheus metric line."""
# Pattern: metric_name{label1="value1",label2="value2"} value [timestamp]
# Or: metric_name value [timestamp]
# Match with labels
match = re.match(
r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([^\s]+)(?:\s+(\d+))?$',
line
)
if match:
name = match.group(1)
labels_str = match.group(2)
value_str = match.group(3)
timestamp_str = match.group(4)
labels = _parse_labels(labels_str)
value = _parse_value(value_str)
timestamp = float(timestamp_str) if timestamp_str else None
return MetricSample(name=name, labels=labels, value=value, timestamp=timestamp)
# Match without labels
match = re.match(
r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([^\s]+)(?:\s+(\d+))?$',
line
)
if match:
name = match.group(1)
value_str = match.group(2)
timestamp_str = match.group(3)
value = _parse_value(value_str)
timestamp = float(timestamp_str) if timestamp_str else None
return MetricSample(name=name, labels={}, value=value, timestamp=timestamp)
return None
def _parse_labels(labels_str: str) -> Dict[str, str]:
"""Parse label string into dictionary."""
labels = {}
# Pattern: key="value"
for match in re.finditer(r'([a-zA-Z_][a-zA-Z0-9_]*)="([^"]*)"', labels_str):
labels[match.group(1)] = match.group(2)
return labels
def _parse_value(value_str: str) -> float:
"""Parse metric value, handling special cases."""
if value_str.lower() == "nan":
return float("nan")
if value_str.lower() == "+inf":
return float("inf")
if value_str.lower() == "-inf":
return float("-inf")
return float(value_str)
def get_metric_value(
metrics: Dict[str, List[MetricSample]],
name: str,
labels: Optional[Dict[str, str]] = None
) -> Optional[float]:
"""
Get a specific metric value by name and optional labels.
Args:
metrics: Parsed metrics dictionary
name: Metric name
labels: Optional label filter
Returns:
Metric value or None if not found
"""
if name not in metrics:
return None
for sample in metrics[name]:
if labels is None:
return sample.value
# Check if all specified labels match
if all(sample.labels.get(k) == v for k, v in labels.items()):
return sample.value
return None
def get_histogram_quantile(
metrics: Dict[str, List[MetricSample]],
name: str,
quantile: float,
labels: Optional[Dict[str, str]] = None
) -> Optional[float]:
"""
Get histogram quantile value from Prometheus histogram.
Args:
metrics: Parsed metrics dictionary
name: Base metric name (without _bucket suffix)
quantile: Desired quantile (e.g., 0.95 for P95)
labels: Optional label filter
Returns:
Approximate quantile value or None
"""
bucket_name = f"{name}_bucket"
if bucket_name not in metrics:
return None
# Get all buckets
buckets = []
for sample in metrics[bucket_name]:
if labels and not all(sample.labels.get(k) == v for k, v in labels.items()):
continue
le = sample.labels.get("le")
if le and le != "+Inf":
buckets.append((float(le), sample.value))
if not buckets:
return None
# Sort by bucket boundary
buckets.sort(key=lambda x: x[0])
# Get total count
total = buckets[-1][1] if buckets else 0
if total == 0:
return None
# Find bucket containing quantile
target = quantile * total
prev_bound = 0
prev_count = 0
for bound, count in buckets:
if count >= target:
# Linear interpolation within bucket
fraction = (target - prev_count) / (count - prev_count) if count > prev_count else 0
return prev_bound + fraction * (bound - prev_bound)
prev_bound = bound
prev_count = count
return buckets[-1][0] if buckets else None