| from typing import Dict, List, Any, Optional |
| import os |
| import json |
|
|
| class MetricMapper: |
| """ |
| Utility for mapping Prometheus label sets into internal node IDs. |
| In environments with many pods, we need to decide which pods to |
| aggregate for a given node_id. |
| """ |
| def __init__(self, mapping_strategy: str = "sum"): |
| self.strategy = mapping_strategy.lower() |
| self.node_mapping = self._load_node_mapping() |
|
|
| def _load_node_mapping(self) -> Dict[str, str]: |
| """Loads label-value -> node_id mapping from env config.""" |
| raw = os.getenv("ANTIATROPOS_LABEL_NODE_MAP", "") |
| if raw: |
| try: |
| data = json.loads(raw) |
| if isinstance(data, dict): |
| return {str(k): str(v) for k, v in data.items()} |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| return { |
| "payments": "node-0", |
| "checkout": "node-1", |
| "catalog": "node-2", |
| "cart": "node-3", |
| "auth": "node-4", |
| } |
|
|
| def _resolve_node_id(self, labels: Dict[str, Any]) -> Optional[str]: |
| """Resolve internal node_id from a Prometheus sample labelset.""" |
| explicit = labels.get("node_id") |
| if explicit: |
| return str(explicit) |
|
|
| for key in ("pod", "service", "app", "workload", "deployment", "instance"): |
| label_value = labels.get(key) |
| if label_value and str(label_value) in self.node_mapping: |
| return self.node_mapping[str(label_value)] |
|
|
| return None |
|
|
| def _reduce(self, values: List[float]) -> float: |
| if not values: |
| return 0.0 |
| if self.strategy == "max": |
| return max(values) |
| if self.strategy == "mean": |
| return sum(values) / len(values) |
| |
| return sum(values) |
|
|
| def aggregate_node_metrics(self, raw_metrics: List[Dict[str, Any]]) -> Dict[str, Dict[str, float]]: |
| """ |
| Aggregates labeled metric samples into node-level telemetry. |
| |
| Expected sample shape: |
| { |
| "metric_name": "request_rate", |
| "labels": {"pod": "web-node-1"}, |
| "value": 42.0 |
| } |
| """ |
| bucket: Dict[str, Dict[str, List[float]]] = {} |
|
|
| for sample in raw_metrics: |
| metric_name = str(sample.get("metric_name", "")) |
| labels = sample.get("labels") or {} |
| value = sample.get("value") |
| if not metric_name or not isinstance(labels, dict) or value is None: |
| continue |
|
|
| node_id = self._resolve_node_id(labels) |
| if not node_id: |
| continue |
|
|
| try: |
| val = float(value) |
| except (TypeError, ValueError): |
| continue |
|
|
| bucket.setdefault(node_id, {}).setdefault(metric_name, []).append(val) |
|
|
| aggregated: Dict[str, Dict[str, float]] = {} |
| for node_id, metric_map in bucket.items(): |
| aggregated[node_id] = { |
| metric_name: self._reduce(values) |
| for metric_name, values in metric_map.items() |
| } |
|
|
| return aggregated |
|
|