| import os |
| import random |
| import logging |
| from typing import Any, Dict, List, Optional |
| import requests |
| from pydantic import BaseModel |
| from .mapping import MetricMapper |
|
|
| logger = logging.getLogger("antiatropos.telemetry") |
|
|
| class TelemetryRecord(BaseModel): |
| node_id: str |
| latency_ms: float |
| request_rate: float |
| error_rate: float |
| cpu_utilization: float |
| queue_depth: float |
|
|
| class PrometheusClient: |
| """ |
| Adapter to fetch and normalize metrics from Prometheus. |
| Supports a mock mode for local development without a live cluster. |
| """ |
| def __init__(self, prometheus_url: Optional[str] = None): |
| |
| self.url = prometheus_url or os.getenv("PROMETHEUS_URL") |
| self.is_mock = not self.url or self.url.lower() == "mock" |
| self.timeout_s = float(os.getenv("ANTIATROPOS_PROM_TIMEOUT_S", "2.5")) |
| self.strict_real = os.getenv("ANTIATROPOS_STRICT_REAL", "false").lower() == "true" |
| self.metric_mapper = MetricMapper( |
| mapping_strategy=os.getenv("ANTIATROPOS_METRIC_AGGREGATION", "sum") |
| ) |
|
|
| self.request_rate_query = os.getenv( |
| "ANTIATROPOS_PROM_QUERY_REQUEST_RATE", |
| 'sum(rate(http_requests_total[1m])) by (pod)' |
| ) |
| self.latency_ms_query = os.getenv( |
| "ANTIATROPOS_PROM_QUERY_LATENCY_MS", |
| 'histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (pod, le)) * 1000' |
| ) |
| self.error_rate_query = os.getenv( |
| "ANTIATROPOS_PROM_QUERY_ERROR_RATE", |
| 'sum(rate(http_requests_total{status=~"5.."}[1m])) by (pod) / clamp_min(sum(rate(http_requests_total[1m])) by (pod), 1)' |
| ) |
| self.cpu_query = os.getenv( |
| "ANTIATROPOS_PROM_QUERY_CPU", |
| 'avg(rate(container_cpu_usage_seconds_total[1m])) by (pod)' |
| ) |
| self.queue_depth_query = os.getenv( |
| "ANTIATROPOS_PROM_QUERY_QUEUE_DEPTH", |
| 'sum(queue_depth) by (pod)' |
| ) |
| |
| def fetch_latest_metrics(self, node_ids: List[str]) -> Dict[str, Any]: |
| """ |
| Query Prometheus for the latest metrics for the given nodes. |
| Returns a mapping from node_id to TelemetryRecord. |
| """ |
| if self.is_mock: |
| return self._generate_mock_metrics(node_ids) |
| |
| |
| try: |
| return self._fetch_real_metrics(node_ids) |
| except Exception: |
| if self.strict_real: |
| raise |
| return self._generate_mock_metrics(node_ids) |
|
|
| def _fetch_real_metrics(self, node_ids: List[str]) -> Dict[str, Any]: |
| """Fetches node telemetry from Prometheus instant queries.""" |
| metrics: Dict[str, Any] = {} |
| saw_any_real_signal = False |
|
|
| req_by_node = self._collect_metric_values("request_rate", self.request_rate_query, node_ids) |
| lat_by_node = self._collect_metric_values("latency_ms", self.latency_ms_query, node_ids) |
| err_by_node = self._collect_metric_values("error_rate", self.error_rate_query, node_ids) |
| cpu_by_node = self._collect_metric_values("cpu_utilization", self.cpu_query, node_ids) |
| q_by_node = self._collect_metric_values("queue_depth", self.queue_depth_query, node_ids) |
|
|
| for node_id in node_ids: |
| req_rate = req_by_node.get(node_id) |
| lat_ms = lat_by_node.get(node_id) |
| err_rate = err_by_node.get(node_id) |
| cpu = cpu_by_node.get(node_id) |
| q_depth = q_by_node.get(node_id) |
|
|
| if any(v is not None for v in (req_rate, lat_ms, err_rate, cpu, q_depth)): |
| saw_any_real_signal = True |
| else: |
| |
| |
| |
| continue |
|
|
| node_payload: Dict[str, float] = {} |
| if lat_ms is not None: |
| node_payload["latency_ms"] = float(lat_ms) |
| if req_rate is not None: |
| node_payload["request_rate"] = float(req_rate) |
| if err_rate is not None: |
| node_payload["error_rate"] = max(0.0, min(1.0, float(err_rate))) |
| if cpu is not None: |
| node_payload["cpu_utilization"] = max(0.0, min(1.0, float(cpu))) |
| if q_depth is not None: |
| node_payload["queue_depth"] = max(0.0, float(q_depth)) |
| if node_payload: |
| metrics[node_id] = node_payload |
|
|
| if self.strict_real and not saw_any_real_signal: |
| raise RuntimeError("Prometheus returned no usable real telemetry for requested node IDs.") |
|
|
| if not saw_any_real_signal: |
| logger.warning( |
| "No per-node Prometheus samples found for configured queries; " |
| "skipping telemetry reconciliation for this step." |
| ) |
|
|
| return metrics |
|
|
| def _collect_metric_values( |
| self, |
| metric_name: str, |
| query: str, |
| node_ids: List[str], |
| ) -> Dict[str, Optional[float]]: |
| """ |
| Collect node values for one logical metric. |
| |
| If query contains "{node_id}", execute per-node scalar queries. |
| Otherwise run one vector query and aggregate labels via MetricMapper. |
| """ |
| out: Dict[str, Optional[float]] = {node_id: None for node_id in node_ids} |
|
|
| if "{node_id}" in query: |
| for node_id in node_ids: |
| out[node_id] = self._query_scalar(query.format(node_id=node_id)) |
| return out |
|
|
| samples = self._query_vector(query) |
| raw_metrics: List[Dict[str, Any]] = [] |
| for sample in samples: |
| labels = sample.get("metric") |
| value = sample.get("value") |
| if not isinstance(labels, dict): |
| continue |
| if not value or len(value) < 2: |
| continue |
| raw_metrics.append( |
| { |
| "metric_name": metric_name, |
| "labels": labels, |
| "value": value[1], |
| } |
| ) |
|
|
| by_node = self.metric_mapper.aggregate_node_metrics(raw_metrics) |
| for node_id in node_ids: |
| metric_map = by_node.get(node_id, {}) |
| value = metric_map.get(metric_name) |
| out[node_id] = value if value is not None else None |
|
|
| return out |
|
|
| def _query_scalar(self, promql: str) -> Optional[float]: |
| """Runs a scalar/vector Prometheus instant query and returns the first value.""" |
| if not self.url: |
| return None |
|
|
| response = requests.get( |
| f"{self.url.rstrip('/')}/api/v1/query", |
| params={"query": promql}, |
| timeout=self.timeout_s, |
| ) |
| response.raise_for_status() |
| payload = response.json() |
|
|
| if payload.get("status") != "success": |
| return None |
|
|
| result = payload.get("data", {}).get("result", []) |
| if not result: |
| return None |
|
|
| value = result[0].get("value") |
| if not value or len(value) < 2: |
| return None |
|
|
| try: |
| return float(value[1]) |
| except (TypeError, ValueError): |
| return None |
|
|
| def _query_vector(self, promql: str) -> List[Dict[str, Any]]: |
| """Runs a Prometheus instant query and returns the full vector result list.""" |
| if not self.url: |
| return [] |
|
|
| response = requests.get( |
| f"{self.url.rstrip('/')}/api/v1/query", |
| params={"query": promql}, |
| timeout=self.timeout_s, |
| ) |
| response.raise_for_status() |
| payload = response.json() |
| if payload.get("status") != "success": |
| return [] |
| result = payload.get("data", {}).get("result", []) |
| return result if isinstance(result, list) else [] |
|
|
| def _generate_mock_metrics(self, node_ids: List[str]) -> Dict[str, TelemetryRecord]: |
| """Generates realistic-looking mock telemetry.""" |
| metrics = {} |
| for nid in node_ids: |
| metrics[nid] = TelemetryRecord( |
| node_id=nid, |
| latency_ms=random.uniform(20.0, 150.0), |
| request_rate=random.uniform(10.0, 50.0), |
| error_rate=random.uniform(0.0, 0.05), |
| cpu_utilization=random.uniform(0.1, 0.8), |
| queue_depth=random.uniform(0.0, 50.0) |
| ) |
| return metrics |
|
|