AntiAtropos / telemetry /prometheus_client.py
div18
chore(kubernetes): support unbounded max replicas and enhance local setup
923f89f
import os
import random
import logging
from typing import Any, Dict, List, Optional
import requests
from pydantic import BaseModel
from .mapping import MetricMapper
logger = logging.getLogger("antiatropos.telemetry")
class TelemetryRecord(BaseModel):
node_id: str
latency_ms: float
request_rate: float
error_rate: float
cpu_utilization: float
queue_depth: float
class PrometheusClient:
"""
Adapter to fetch and normalize metrics from Prometheus.
Supports a mock mode for local development without a live cluster.
"""
def __init__(self, prometheus_url: Optional[str] = None):
# Use provided URL or env var, defaulting to mock if neither is found
self.url = prometheus_url or os.getenv("PROMETHEUS_URL")
self.is_mock = not self.url or self.url.lower() == "mock"
self.timeout_s = float(os.getenv("ANTIATROPOS_PROM_TIMEOUT_S", "2.5"))
self.strict_real = os.getenv("ANTIATROPOS_STRICT_REAL", "false").lower() == "true"
self.metric_mapper = MetricMapper(
mapping_strategy=os.getenv("ANTIATROPOS_METRIC_AGGREGATION", "sum")
)
self.request_rate_query = os.getenv(
"ANTIATROPOS_PROM_QUERY_REQUEST_RATE",
'sum(rate(http_requests_total[1m])) by (pod)'
)
self.latency_ms_query = os.getenv(
"ANTIATROPOS_PROM_QUERY_LATENCY_MS",
'histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (pod, le)) * 1000'
)
self.error_rate_query = os.getenv(
"ANTIATROPOS_PROM_QUERY_ERROR_RATE",
'sum(rate(http_requests_total{status=~"5.."}[1m])) by (pod) / clamp_min(sum(rate(http_requests_total[1m])) by (pod), 1)'
)
self.cpu_query = os.getenv(
"ANTIATROPOS_PROM_QUERY_CPU",
'avg(rate(container_cpu_usage_seconds_total[1m])) by (pod)'
)
self.queue_depth_query = os.getenv(
"ANTIATROPOS_PROM_QUERY_QUEUE_DEPTH",
'sum(queue_depth) by (pod)'
)
def fetch_latest_metrics(self, node_ids: List[str]) -> Dict[str, Any]:
"""
Query Prometheus for the latest metrics for the given nodes.
Returns a mapping from node_id to TelemetryRecord.
"""
if self.is_mock:
return self._generate_mock_metrics(node_ids)
# Real implementation using queries
try:
return self._fetch_real_metrics(node_ids)
except Exception:
if self.strict_real:
raise
return self._generate_mock_metrics(node_ids)
def _fetch_real_metrics(self, node_ids: List[str]) -> Dict[str, Any]:
"""Fetches node telemetry from Prometheus instant queries."""
metrics: Dict[str, Any] = {}
saw_any_real_signal = False
req_by_node = self._collect_metric_values("request_rate", self.request_rate_query, node_ids)
lat_by_node = self._collect_metric_values("latency_ms", self.latency_ms_query, node_ids)
err_by_node = self._collect_metric_values("error_rate", self.error_rate_query, node_ids)
cpu_by_node = self._collect_metric_values("cpu_utilization", self.cpu_query, node_ids)
q_by_node = self._collect_metric_values("queue_depth", self.queue_depth_query, node_ids)
for node_id in node_ids:
req_rate = req_by_node.get(node_id)
lat_ms = lat_by_node.get(node_id)
err_rate = err_by_node.get(node_id)
cpu = cpu_by_node.get(node_id)
q_depth = q_by_node.get(node_id)
if any(v is not None for v in (req_rate, lat_ms, err_rate, cpu, q_depth)):
saw_any_real_signal = True
else:
# No usable sample for this node this cycle; skip reconciliation
# so simulator dynamics are preserved instead of being collapsed
# toward zero by synthetic defaults.
continue
node_payload: Dict[str, float] = {}
if lat_ms is not None:
node_payload["latency_ms"] = float(lat_ms)
if req_rate is not None:
node_payload["request_rate"] = float(req_rate)
if err_rate is not None:
node_payload["error_rate"] = max(0.0, min(1.0, float(err_rate)))
if cpu is not None:
node_payload["cpu_utilization"] = max(0.0, min(1.0, float(cpu)))
if q_depth is not None:
node_payload["queue_depth"] = max(0.0, float(q_depth))
if node_payload:
metrics[node_id] = node_payload
if self.strict_real and not saw_any_real_signal:
raise RuntimeError("Prometheus returned no usable real telemetry for requested node IDs.")
if not saw_any_real_signal:
logger.warning(
"No per-node Prometheus samples found for configured queries; "
"skipping telemetry reconciliation for this step."
)
return metrics
def _collect_metric_values(
self,
metric_name: str,
query: str,
node_ids: List[str],
) -> Dict[str, Optional[float]]:
"""
Collect node values for one logical metric.
If query contains "{node_id}", execute per-node scalar queries.
Otherwise run one vector query and aggregate labels via MetricMapper.
"""
out: Dict[str, Optional[float]] = {node_id: None for node_id in node_ids}
if "{node_id}" in query:
for node_id in node_ids:
out[node_id] = self._query_scalar(query.format(node_id=node_id))
return out
samples = self._query_vector(query)
raw_metrics: List[Dict[str, Any]] = []
for sample in samples:
labels = sample.get("metric")
value = sample.get("value")
if not isinstance(labels, dict):
continue
if not value or len(value) < 2:
continue
raw_metrics.append(
{
"metric_name": metric_name,
"labels": labels,
"value": value[1],
}
)
by_node = self.metric_mapper.aggregate_node_metrics(raw_metrics)
for node_id in node_ids:
metric_map = by_node.get(node_id, {})
value = metric_map.get(metric_name)
out[node_id] = value if value is not None else None
return out
def _query_scalar(self, promql: str) -> Optional[float]:
"""Runs a scalar/vector Prometheus instant query and returns the first value."""
if not self.url:
return None
response = requests.get(
f"{self.url.rstrip('/')}/api/v1/query",
params={"query": promql},
timeout=self.timeout_s,
)
response.raise_for_status()
payload = response.json()
if payload.get("status") != "success":
return None
result = payload.get("data", {}).get("result", [])
if not result:
return None
value = result[0].get("value")
if not value or len(value) < 2:
return None
try:
return float(value[1])
except (TypeError, ValueError):
return None
def _query_vector(self, promql: str) -> List[Dict[str, Any]]:
"""Runs a Prometheus instant query and returns the full vector result list."""
if not self.url:
return []
response = requests.get(
f"{self.url.rstrip('/')}/api/v1/query",
params={"query": promql},
timeout=self.timeout_s,
)
response.raise_for_status()
payload = response.json()
if payload.get("status") != "success":
return []
result = payload.get("data", {}).get("result", [])
return result if isinstance(result, list) else []
def _generate_mock_metrics(self, node_ids: List[str]) -> Dict[str, TelemetryRecord]:
"""Generates realistic-looking mock telemetry."""
metrics = {}
for nid in node_ids:
metrics[nid] = TelemetryRecord(
node_id=nid,
latency_ms=random.uniform(20.0, 150.0),
request_rate=random.uniform(10.0, 50.0),
error_rate=random.uniform(0.0, 0.05),
cpu_utilization=random.uniform(0.1, 0.8),
queue_depth=random.uniform(0.0, 50.0)
)
return metrics