from typing import Dict, List, Any, Optional import os import json class MetricMapper: """ Utility for mapping Prometheus label sets into internal node IDs. In environments with many pods, we need to decide which pods to aggregate for a given node_id. """ def __init__(self, mapping_strategy: str = "sum"): self.strategy = mapping_strategy.lower() self.node_mapping = self._load_node_mapping() def _load_node_mapping(self) -> Dict[str, str]: """Loads label-value -> node_id mapping from env config.""" raw = os.getenv("ANTIATROPOS_LABEL_NODE_MAP", "") if raw: try: data = json.loads(raw) if isinstance(data, dict): return {str(k): str(v) for k, v in data.items()} except json.JSONDecodeError: pass # Safe default mapping for local demos. return { "payments": "node-0", "checkout": "node-1", "catalog": "node-2", "cart": "node-3", "auth": "node-4", } def _resolve_node_id(self, labels: Dict[str, Any]) -> Optional[str]: """Resolve internal node_id from a Prometheus sample labelset.""" explicit = labels.get("node_id") if explicit: return str(explicit) for key in ("pod", "service", "app", "workload", "deployment", "instance"): label_value = labels.get(key) if label_value and str(label_value) in self.node_mapping: return self.node_mapping[str(label_value)] return None def _reduce(self, values: List[float]) -> float: if not values: return 0.0 if self.strategy == "max": return max(values) if self.strategy == "mean": return sum(values) / len(values) # default: sum return sum(values) def aggregate_node_metrics(self, raw_metrics: List[Dict[str, Any]]) -> Dict[str, Dict[str, float]]: """ Aggregates labeled metric samples into node-level telemetry. Expected sample shape: { "metric_name": "request_rate", "labels": {"pod": "web-node-1"}, "value": 42.0 } """ bucket: Dict[str, Dict[str, List[float]]] = {} for sample in raw_metrics: metric_name = str(sample.get("metric_name", "")) labels = sample.get("labels") or {} value = sample.get("value") if not metric_name or not isinstance(labels, dict) or value is None: continue node_id = self._resolve_node_id(labels) if not node_id: continue try: val = float(value) except (TypeError, ValueError): continue bucket.setdefault(node_id, {}).setdefault(metric_name, []).append(val) aggregated: Dict[str, Dict[str, float]] = {} for node_id, metric_map in bucket.items(): aggregated[node_id] = { metric_name: self._reduce(values) for metric_name, values in metric_map.items() } return aggregated