File size: 4,894 Bytes
dfe5268 6ad7bd8 dfe5268 6ad7bd8 dfe5268 6ad7bd8 dfe5268 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | import threading
from typing import Optional
try:
from prometheus_client import Counter, Gauge, Histogram, generate_latest
except ImportError: # pragma: no cover
Counter = Gauge = Histogram = None
def generate_latest() -> bytes: # type: ignore[override]
return b"# prometheus_client not installed\n"
def _enabled() -> bool:
return Counter is not None and Gauge is not None and Histogram is not None
class ObservabilityTracker:
"""Prometheus metrics for action/reward/health monitoring."""
def __init__(self):
self._lock = threading.Lock()
self._is_enabled = _enabled()
if not self._is_enabled:
return
self.steps_total = Counter(
"antiatropos_steps_total",
"Total environment steps",
["task_id", "mode"],
)
self.actions_total = Counter(
"antiatropos_actions_total",
"Actions executed by type/target/status",
["task_id", "mode", "action_type", "target_node_id", "ack_class"],
)
self.executor_errors_total = Counter(
"antiatropos_executor_errors_total",
"Executor errors by code",
["mode", "error_code"],
)
self.executor_latency_ms = Histogram(
"antiatropos_executor_latency_ms",
"Executor latency in milliseconds",
["mode"],
buckets=(1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000),
)
self.reward_gauge = Gauge(
"antiatropos_reward",
"Latest output reward value (depends on reward output mode)",
["task_id", "mode"],
)
self.reward_raw_gauge = Gauge(
"antiatropos_reward_raw",
"Latest raw reward value before normalization",
["task_id", "mode"],
)
self.reward_normalized_gauge = Gauge(
"antiatropos_reward_normalized",
"Latest normalized reward value in [0,1]",
["task_id", "mode"],
)
self.lyapunov_gauge = Gauge(
"antiatropos_lyapunov_energy",
"Latest Lyapunov energy",
["task_id", "mode"],
)
self.queue_gauge = Gauge(
"antiatropos_total_queue_backlog",
"Latest normalized total queue backlog",
["task_id", "mode"],
)
self.latency_gauge = Gauge(
"antiatropos_average_latency_norm",
"Latest normalized average latency",
["task_id", "mode"],
)
def record_step(
self,
task_id: str,
mode: str,
action_type: str,
target_node_id: str,
ack_status: str,
reward_output: float,
reward_raw: float,
reward_normalized: float,
lyapunov_energy: float,
total_queue_backlog: float,
average_latency_ms: float,
executor_latency_ms: float,
executor_error_code: str,
) -> None:
if not self._is_enabled:
return
ack_class = self._classify_ack(ack_status)
with self._lock:
self.steps_total.labels(task_id=task_id, mode=mode).inc()
self.actions_total.labels(
task_id=task_id,
mode=mode,
action_type=action_type,
target_node_id=target_node_id,
ack_class=ack_class,
).inc()
self.reward_gauge.labels(task_id=task_id, mode=mode).set(float(reward_output))
self.reward_raw_gauge.labels(task_id=task_id, mode=mode).set(float(reward_raw))
self.reward_normalized_gauge.labels(task_id=task_id, mode=mode).set(float(reward_normalized))
self.lyapunov_gauge.labels(task_id=task_id, mode=mode).set(float(lyapunov_energy))
self.queue_gauge.labels(task_id=task_id, mode=mode).set(float(total_queue_backlog))
self.latency_gauge.labels(task_id=task_id, mode=mode).set(float(average_latency_ms))
self.executor_latency_ms.labels(mode=mode).observe(max(0.0, float(executor_latency_ms)))
if executor_error_code:
self.executor_errors_total.labels(mode=mode, error_code=executor_error_code).inc()
@staticmethod
def _classify_ack(ack_status: str) -> str:
status = str(ack_status)
if status.startswith("Ack:") or status.startswith("success"):
return "ack"
if status.startswith("Rejected:"):
return "rejected"
if status.startswith("Error:"):
return "error"
return "unknown"
_TRACKER: Optional[ObservabilityTracker] = None
def get_observability_tracker() -> ObservabilityTracker:
global _TRACKER
if _TRACKER is None:
_TRACKER = ObservabilityTracker()
return _TRACKER
def render_prometheus_metrics() -> bytes:
return generate_latest()
|