File size: 5,231 Bytes
aefabf0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""Parser for Prometheus text format metrics."""

import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass


@dataclass
class MetricSample:
    """A single metric sample with labels and value."""
    name: str
    labels: Dict[str, str]
    value: float
    timestamp: Optional[float] = None


def parse_prometheus_metrics(text: str) -> Dict[str, List[MetricSample]]:
    """
    Parse Prometheus text format into structured metrics.

    Args:
        text: Raw Prometheus metrics text

    Returns:
        Dictionary mapping metric names to lists of samples
    """
    metrics: Dict[str, List[MetricSample]] = {}

    for line in text.strip().split("\n"):
        line = line.strip()

        # Skip empty lines and comments
        if not line or line.startswith("#"):
            continue

        # Parse metric line
        sample = _parse_metric_line(line)
        if sample:
            if sample.name not in metrics:
                metrics[sample.name] = []
            metrics[sample.name].append(sample)

    return metrics


def _parse_metric_line(line: str) -> Optional[MetricSample]:
    """Parse a single Prometheus metric line."""
    # Pattern: metric_name{label1="value1",label2="value2"} value [timestamp]
    # Or: metric_name value [timestamp]

    # Match with labels
    match = re.match(
        r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([^\s]+)(?:\s+(\d+))?$',
        line
    )

    if match:
        name = match.group(1)
        labels_str = match.group(2)
        value_str = match.group(3)
        timestamp_str = match.group(4)

        labels = _parse_labels(labels_str)
        value = _parse_value(value_str)
        timestamp = float(timestamp_str) if timestamp_str else None

        return MetricSample(name=name, labels=labels, value=value, timestamp=timestamp)

    # Match without labels
    match = re.match(
        r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([^\s]+)(?:\s+(\d+))?$',
        line
    )

    if match:
        name = match.group(1)
        value_str = match.group(2)
        timestamp_str = match.group(3)

        value = _parse_value(value_str)
        timestamp = float(timestamp_str) if timestamp_str else None

        return MetricSample(name=name, labels={}, value=value, timestamp=timestamp)

    return None


def _parse_labels(labels_str: str) -> Dict[str, str]:
    """Parse label string into dictionary."""
    labels = {}

    # Pattern: key="value"
    for match in re.finditer(r'([a-zA-Z_][a-zA-Z0-9_]*)="([^"]*)"', labels_str):
        labels[match.group(1)] = match.group(2)

    return labels


def _parse_value(value_str: str) -> float:
    """Parse metric value, handling special cases."""
    if value_str.lower() == "nan":
        return float("nan")
    if value_str.lower() == "+inf":
        return float("inf")
    if value_str.lower() == "-inf":
        return float("-inf")
    return float(value_str)


def get_metric_value(
    metrics: Dict[str, List[MetricSample]],
    name: str,
    labels: Optional[Dict[str, str]] = None
) -> Optional[float]:
    """
    Get a specific metric value by name and optional labels.

    Args:
        metrics: Parsed metrics dictionary
        name: Metric name
        labels: Optional label filter

    Returns:
        Metric value or None if not found
    """
    if name not in metrics:
        return None

    for sample in metrics[name]:
        if labels is None:
            return sample.value

        # Check if all specified labels match
        if all(sample.labels.get(k) == v for k, v in labels.items()):
            return sample.value

    return None


def get_histogram_quantile(
    metrics: Dict[str, List[MetricSample]],
    name: str,
    quantile: float,
    labels: Optional[Dict[str, str]] = None
) -> Optional[float]:
    """
    Get histogram quantile value from Prometheus histogram.

    Args:
        metrics: Parsed metrics dictionary
        name: Base metric name (without _bucket suffix)
        quantile: Desired quantile (e.g., 0.95 for P95)
        labels: Optional label filter

    Returns:
        Approximate quantile value or None
    """
    bucket_name = f"{name}_bucket"
    if bucket_name not in metrics:
        return None

    # Get all buckets
    buckets = []
    for sample in metrics[bucket_name]:
        if labels and not all(sample.labels.get(k) == v for k, v in labels.items()):
            continue
        le = sample.labels.get("le")
        if le and le != "+Inf":
            buckets.append((float(le), sample.value))

    if not buckets:
        return None

    # Sort by bucket boundary
    buckets.sort(key=lambda x: x[0])

    # Get total count
    total = buckets[-1][1] if buckets else 0
    if total == 0:
        return None

    # Find bucket containing quantile
    target = quantile * total
    prev_bound = 0
    prev_count = 0

    for bound, count in buckets:
        if count >= target:
            # Linear interpolation within bucket
            fraction = (target - prev_count) / (count - prev_count) if count > prev_count else 0
            return prev_bound + fraction * (bound - prev_bound)
        prev_bound = bound
        prev_count = count

    return buckets[-1][0] if buckets else None