Spaces:
Sleeping
Sleeping
File size: 5,231 Bytes
aefabf0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """Parser for Prometheus text format metrics."""
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class MetricSample:
"""A single metric sample with labels and value."""
name: str
labels: Dict[str, str]
value: float
timestamp: Optional[float] = None
def parse_prometheus_metrics(text: str) -> Dict[str, List[MetricSample]]:
"""
Parse Prometheus text format into structured metrics.
Args:
text: Raw Prometheus metrics text
Returns:
Dictionary mapping metric names to lists of samples
"""
metrics: Dict[str, List[MetricSample]] = {}
for line in text.strip().split("\n"):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Parse metric line
sample = _parse_metric_line(line)
if sample:
if sample.name not in metrics:
metrics[sample.name] = []
metrics[sample.name].append(sample)
return metrics
def _parse_metric_line(line: str) -> Optional[MetricSample]:
"""Parse a single Prometheus metric line."""
# Pattern: metric_name{label1="value1",label2="value2"} value [timestamp]
# Or: metric_name value [timestamp]
# Match with labels
match = re.match(
r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([^\s]+)(?:\s+(\d+))?$',
line
)
if match:
name = match.group(1)
labels_str = match.group(2)
value_str = match.group(3)
timestamp_str = match.group(4)
labels = _parse_labels(labels_str)
value = _parse_value(value_str)
timestamp = float(timestamp_str) if timestamp_str else None
return MetricSample(name=name, labels=labels, value=value, timestamp=timestamp)
# Match without labels
match = re.match(
r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([^\s]+)(?:\s+(\d+))?$',
line
)
if match:
name = match.group(1)
value_str = match.group(2)
timestamp_str = match.group(3)
value = _parse_value(value_str)
timestamp = float(timestamp_str) if timestamp_str else None
return MetricSample(name=name, labels={}, value=value, timestamp=timestamp)
return None
def _parse_labels(labels_str: str) -> Dict[str, str]:
"""Parse label string into dictionary."""
labels = {}
# Pattern: key="value"
for match in re.finditer(r'([a-zA-Z_][a-zA-Z0-9_]*)="([^"]*)"', labels_str):
labels[match.group(1)] = match.group(2)
return labels
def _parse_value(value_str: str) -> float:
"""Parse metric value, handling special cases."""
if value_str.lower() == "nan":
return float("nan")
if value_str.lower() == "+inf":
return float("inf")
if value_str.lower() == "-inf":
return float("-inf")
return float(value_str)
def get_metric_value(
metrics: Dict[str, List[MetricSample]],
name: str,
labels: Optional[Dict[str, str]] = None
) -> Optional[float]:
"""
Get a specific metric value by name and optional labels.
Args:
metrics: Parsed metrics dictionary
name: Metric name
labels: Optional label filter
Returns:
Metric value or None if not found
"""
if name not in metrics:
return None
for sample in metrics[name]:
if labels is None:
return sample.value
# Check if all specified labels match
if all(sample.labels.get(k) == v for k, v in labels.items()):
return sample.value
return None
def get_histogram_quantile(
metrics: Dict[str, List[MetricSample]],
name: str,
quantile: float,
labels: Optional[Dict[str, str]] = None
) -> Optional[float]:
"""
Get histogram quantile value from Prometheus histogram.
Args:
metrics: Parsed metrics dictionary
name: Base metric name (without _bucket suffix)
quantile: Desired quantile (e.g., 0.95 for P95)
labels: Optional label filter
Returns:
Approximate quantile value or None
"""
bucket_name = f"{name}_bucket"
if bucket_name not in metrics:
return None
# Get all buckets
buckets = []
for sample in metrics[bucket_name]:
if labels and not all(sample.labels.get(k) == v for k, v in labels.items()):
continue
le = sample.labels.get("le")
if le and le != "+Inf":
buckets.append((float(le), sample.value))
if not buckets:
return None
# Sort by bucket boundary
buckets.sort(key=lambda x: x[0])
# Get total count
total = buckets[-1][1] if buckets else 0
if total == 0:
return None
# Find bucket containing quantile
target = quantile * total
prev_bound = 0
prev_count = 0
for bound, count in buckets:
if count >= target:
# Linear interpolation within bucket
fraction = (target - prev_count) / (count - prev_count) if count > prev_count else 0
return prev_bound + fraction * (bound - prev_bound)
prev_bound = bound
prev_count = count
return buckets[-1][0] if buckets else None
|