| import time |
| import json |
| import os |
| import logging |
| from uuid import uuid4 |
|
|
| from openenv.core.env_server.interfaces import Environment |
| from openenv.core.env_server.types import State |
|
|
| try: |
| from ..models import SREAction, ClusterObservation, NodeObservation, NodeStatus, EnvironmentMode |
| from ..simulator import ClusterSimulator, DEFAULT_CAPACITY, COST_PER_CAPACITY_UNIT_PER_HOUR, OVERPROVISION_COST_PER_UNIT, CLUSTER_TOPOLOGY |
| from ..stability import ( |
| compute_lyapunov, |
| compute_lyapunov_graph, |
| compute_reward, |
| compute_barrier, |
| normalize_reward, |
| smooth_sla_penalty, |
| compute_drift, |
| BARRIER_NORM_SCALE, |
| REWARD_SCALE_VERSION, |
| ) |
| from ..telemetry import PrometheusClient, get_observability_tracker |
| from ..control import KubernetesExecutor, ActionValidator |
| except ImportError: |
| from models import SREAction, ClusterObservation, NodeObservation, NodeStatus, EnvironmentMode |
| from simulator import ClusterSimulator, DEFAULT_CAPACITY, COST_PER_CAPACITY_UNIT_PER_HOUR, OVERPROVISION_COST_PER_UNIT, CLUSTER_TOPOLOGY |
| from stability import ( |
| compute_lyapunov, |
| compute_lyapunov_graph, |
| compute_reward, |
| compute_barrier, |
| normalize_reward, |
| smooth_sla_penalty, |
| compute_drift, |
| BARRIER_NORM_SCALE, |
| REWARD_SCALE_VERSION, |
| ) |
| from telemetry import PrometheusClient, get_observability_tracker |
| from control import KubernetesExecutor, ActionValidator |
| import math |
|
|
|
|
| |
| |
| |
|
|
| ALPHA: float = 0.002 |
| BETA: float = 1.5 |
| GAMMA: float = 4.0 |
| DELTA: float = 0.1 |
| ZETA: float = 0.01 |
|
|
| |
| REPETITION_SOFT_LIMIT = 3 |
|
|
| MAX_QUEUE_NORM = 200.0 |
| MAX_LATENCY_NORM = 1000.0 |
| MAX_REQUEST_RATE_NORM = 100.0 |
|
|
| MAX_STEPS: int = 100 |
| N_NODES: int = 5 |
| REWARD_OUTPUT_MODES = {"normalized", "raw"} |
|
|
|
|
| class AntiAtroposEnvironment(Environment): |
| """ |
| Autonomous SRE simulation environment. |
| |
| The agent observes a microservice cluster and issues management commands |
| (SCALE_UP, SCALE_DOWN, REROUTE_TRAFFIC, SHED_LOAD, NO_OP) each step. |
| The environment advances one discrete time-tick per step, computes the |
| Lyapunov reward, and returns the updated ClusterObservation. |
| """ |
|
|
| SUPPORTS_CONCURRENT_SESSIONS: bool = True |
|
|
| @staticmethod |
| def _parse_mode(raw_mode: str | None) -> EnvironmentMode: |
| env_mode = os.getenv("ANTIATROPOS_ENV_MODE", "").strip().lower() |
| candidate = (env_mode or raw_mode or "simulated").strip().lower() |
| alias = { |
| "prod": "live", |
| "production": "live", |
| } |
| normalized = alias.get(candidate, candidate) |
| try: |
| return EnvironmentMode(normalized) |
| except ValueError: |
| return EnvironmentMode.SIMULATED |
|
|
| def _uses_real_telemetry(self) -> bool: |
| return self._mode in [EnvironmentMode.HYBRID, EnvironmentMode.LIVE] |
|
|
| def _uses_real_executor(self) -> bool: |
| return self._mode in [EnvironmentMode.LIVE] |
|
|
| def __init__(self): |
| """Initialise environment metadata and the simulation core.""" |
| self._state = State(episode_id=str(uuid4()), step_count=0) |
| self._task_id: str = "task-1" |
| self._mode: EnvironmentMode = EnvironmentMode.SIMULATED |
| |
| |
| self._sim: ClusterSimulator = ClusterSimulator(n_nodes=N_NODES, task_id="task-1") |
| self._telemetry = PrometheusClient() |
| self._executor = KubernetesExecutor() |
| self._validator = ActionValidator() |
| self._observability = get_observability_tracker() |
| self._logger = logging.getLogger("antiatropos.env") |
| |
| self._nodes_true: list[dict] = [] |
| self._nodes_obs: list[dict] = [] |
| self._prev_nodes_true: list[dict] = [] |
| self._prev_lyapunov: float = 0.0 |
| self._sla_violations: int = 0 |
| self._action_ack_status: str = "success" |
| self._last_action_id: str = "" |
| self._last_executor_latency_ms: float = 0.0 |
| self._last_executor_error_code: str = "" |
| self._last_raw_reward: float = 0.0 |
| self._last_normalized_reward: float = 0.0 |
| self._last_reward_drift: float = 0.0 |
| self._last_reward_cost: float = 0.0 |
| self._last_reward_sla: float = 0.0 |
| self._last_reward_barrier: float = 0.0 |
| self._reward_output_mode: str = os.getenv("ANTIATROPOS_REWARD_OUTPUT_MODE", "normalized").strip().lower() |
| if self._reward_output_mode not in REWARD_OUTPUT_MODES: |
| self._reward_output_mode = "normalized" |
| self._last_metric_time: float = 0.0 |
|
|
| def reset(self, task_id: str = "task-1", mode: str | None = None, seed: int | None = None) -> ClusterObservation: |
| """ |
| Start a fresh episode with a specific task profile and mode. |
| """ |
| self._state = State(episode_id=str(uuid4()), step_count=0) |
| self._task_id = task_id |
| self._mode = self._parse_mode(mode) |
| |
| self._sla_violations = 0 |
| self._action_ack_status = "success" |
| self._last_action_id = "" |
| self._last_executor_latency_ms = 0.0 |
| self._last_executor_error_code = "" |
| self._last_raw_reward = 0.0 |
| self._last_normalized_reward = 0.0 |
| |
| |
| if self._uses_real_telemetry(): |
| self._last_metric_time = time.time() |
| else: |
| self._last_metric_time = 0.0 |
|
|
| |
| if self._mode != EnvironmentMode.SIMULATED: |
| |
| |
| pass |
|
|
| self._sim.reset(task_id=task_id, seed=seed) |
| |
| |
| if self._uses_real_telemetry(): |
| node_ids = [n["node_id"] for n in self._sim.state(for_agent=False)] |
| metrics = self._telemetry.fetch_latest_metrics(node_ids) |
| self._sim.reconcile_state(metrics) |
|
|
| self._nodes_true = self._sim.state(for_agent=False) |
| self._nodes_obs = self._sim.state(for_agent=True) |
| self._prev_lyapunov = compute_lyapunov_graph(self._nodes_true, CLUSTER_TOPOLOGY) |
|
|
| return self._build_observation() |
|
|
| def step(self, action: SREAction) -> ClusterObservation: |
| """ |
| Advance the simulation by one discrete time tick. |
| """ |
| self._state.step_count += 1 |
| self._last_action_id = str(uuid4()) |
| self._last_executor_latency_ms = 0.0 |
| self._last_executor_error_code = "" |
| self._last_raw_reward = 0.0 |
| self._last_normalized_reward = 0.0 |
| |
| |
| valid_targets = [n["node_id"] for n in self._nodes_true] |
| is_enabled, mode_error = self._is_action_enabled_for_mode(action.action_type) |
| if not is_enabled: |
| self._action_ack_status = f"Rejected: {mode_error}" |
| self._last_executor_error_code = "" |
| is_valid = False |
| error = mode_error |
| cooldown_penalty = 0.0 |
| else: |
| self._validator.set_tick(self._state.step_count) |
| is_valid, error, cooldown_penalty = self._validator.validate( |
| action.action_type, |
| action.target_node_id, |
| action.parameter, |
| valid_targets=valid_targets |
| ) |
| |
| apply_to_simulator = False |
| had_effect = True |
|
|
| if not is_valid: |
| self._action_ack_status = f"Rejected: {error}" |
| had_effect = False |
| |
| if not self._last_executor_error_code and not str(error).startswith("Live mode rejected"): |
| self._last_executor_error_code = "VALIDATION_FAILED" |
| |
| self._sim.invalid_action_count += 1 |
| |
| else: |
| if self._uses_real_executor(): |
| |
| exec_result = self._executor.execute_with_metadata( |
| action.action_type, |
| action.target_node_id, |
| action.parameter, |
| ) |
| self._last_action_id = exec_result.get("action_id", self._last_action_id) |
| self._action_ack_status = exec_result.get("ack_status", "Error: missing ack status") |
| self._last_executor_latency_ms = float(exec_result.get("executor_latency_ms", 0.0)) |
| self._last_executor_error_code = str(exec_result.get("executor_error_code", "")) |
| apply_to_simulator = self._action_ack_status.startswith("Ack:") |
| else: |
| self._action_ack_status = "success (simulated)" |
| apply_to_simulator = True |
|
|
| |
| if apply_to_simulator: |
| had_effect = self._sim.apply_action(action) |
| if not had_effect: |
| |
| |
| |
| self._sim.invalid_action_count += 1 |
| self._action_ack_status = "Rejected: simulator rejected action" |
|
|
| |
| self._sim.tick() |
| |
| |
| if self._uses_real_telemetry(): |
| node_ids = [n["node_id"] for n in self._nodes_true] |
| metrics = self._telemetry.fetch_latest_metrics(node_ids) |
| self._sim.reconcile_state(metrics) |
| self._last_metric_time = time.time() |
| |
| |
| self._prev_nodes_true = self._nodes_true |
| self._nodes_true = self._sim.state(for_agent=False) |
| self._nodes_obs = self._sim.state(for_agent=True) |
|
|
| |
| avg_latency_norm = min(1.0, max(0.0, self._avg_latency(self._nodes_true) / MAX_LATENCY_NORM)) |
| error_rate = self._error_rate(self._nodes_true) |
| sla_penalty_step = smooth_sla_penalty(avg_latency_norm, error_rate) |
| |
| if avg_latency_norm > 0.20 or error_rate > 0.05: |
| self._sla_violations += 1 |
|
|
| |
| current_lyapunov = compute_lyapunov_graph(self._nodes_true, CLUSTER_TOPOLOGY) |
| |
| |
| cost = self._compute_cost(self._nodes_true) |
| barrier = compute_barrier(self._nodes_true) |
| raw_reward = compute_reward( |
| v_prev=self._prev_lyapunov, |
| v_curr=current_lyapunov, |
| cost=cost, |
| sla_violation_step=sla_penalty_step, |
| alpha=ALPHA, |
| beta=BETA, |
| gamma=GAMMA, |
| barrier=barrier, |
| delta=DELTA, |
| ) |
| normalized_reward = normalize_reward(raw_reward) |
| |
| |
| if cooldown_penalty > 0: |
| normalized_reward = max(0.0, normalized_reward - cooldown_penalty * 0.1) |
| |
| |
| |
| |
| if not had_effect: |
| normalized_reward = max(0.0, normalized_reward - 0.05) |
| reward = normalized_reward if self._reward_output_mode == "normalized" else raw_reward |
| self._last_raw_reward = raw_reward |
| self._last_normalized_reward = normalized_reward |
| |
| delta_v = compute_drift(self._prev_lyapunov, current_lyapunov) |
| barrier_norm = barrier / BARRIER_NORM_SCALE if BARRIER_NORM_SCALE > 0 else barrier |
| self._last_reward_drift = -(ALPHA * delta_v) |
| self._last_reward_cost = -(BETA * cost) |
| self._last_reward_sla = -(GAMMA * sla_penalty_step) |
| self._last_reward_barrier = -(DELTA * barrier_norm) |
| |
| self._prev_lyapunov = current_lyapunov |
|
|
| |
| done = ( |
| self._state.step_count >= MAX_STEPS |
| or all(n["status"] == NodeStatus.FAILED for n in self._nodes_true) |
| ) |
|
|
| |
| obs = self._build_observation() |
| obs.done = done |
| obs.reward = reward |
|
|
| self._observability.record_step( |
| task_id=self._task_id, |
| mode=str(self._mode.value), |
| action_type=str(action.action_type.value), |
| target_node_id=str(action.target_node_id), |
| ack_status=self._action_ack_status, |
| reward_output=reward, |
| reward_raw=raw_reward, |
| reward_normalized=normalized_reward, |
| lyapunov_energy=obs.lyapunov_energy, |
| total_queue_backlog=obs.total_queue_backlog, |
| average_latency_ms=obs.average_latency_ms, |
| executor_latency_ms=self._last_executor_latency_ms, |
| executor_error_code=self._last_executor_error_code, |
| ) |
|
|
| self._logger.info( |
| json.dumps( |
| { |
| "event": "antiatropos_step", |
| "episode_id": self._state.episode_id, |
| "task_id": self._task_id, |
| "mode": self._mode.value, |
| "step": self._state.step_count, |
| "action_type": action.action_type.value, |
| "target_node_id": action.target_node_id, |
| "parameter": float(action.parameter), |
| "action_id": self._last_action_id, |
| "action_ack_status": self._action_ack_status, |
| "executor_latency_ms": self._last_executor_latency_ms, |
| "executor_error_code": self._last_executor_error_code, |
| "reward_output": reward, |
| "reward_raw": raw_reward, |
| "reward_normalized": normalized_reward, |
| "reward_output_mode": self._reward_output_mode, |
| "lyapunov_energy": obs.lyapunov_energy, |
| "average_latency_ms_norm": obs.average_latency_ms, |
| "total_queue_backlog_norm": obs.total_queue_backlog, |
| "error_rate": obs.error_rate, |
| "done": done, |
| } |
| ) |
| ) |
|
|
| return obs |
|
|
| @property |
| def state(self) -> State: |
| return self._state |
|
|
| |
| |
| |
| def _is_action_enabled_for_mode(self, action_type: str) -> tuple[bool, str]: |
| if hasattr(action_type, "value"): |
| action = str(action_type.value) |
| else: |
| action = str(action_type) |
| if self._mode in [EnvironmentMode.SIMULATED, EnvironmentMode.HYBRID]: |
| return True, "Enabled" |
|
|
| if self._mode in [EnvironmentMode.LIVE]: |
| capability_error = self._executor.live_capability_error(action) |
| if capability_error: |
| return False, capability_error |
| return True, "Enabled" |
|
|
| return False, f"Unsupported environment mode: {self._mode}" |
|
|
| def _compute_cost(self, nodes_true: list[dict]) -> float: |
| """Three-tier cost model calibrated to DEFAULT_CAPACITY as the baseline. |
| |
| Tier 1 — Baseline capacity (up to DEFAULT_CAPACITY): cheap base rate. |
| Infrastructure already provisioned and paid for — no penalty. |
| Tier 2 — Justified excess (above DEFAULT_CAPACITY, up to 'needed', or |
| pending/booting capacity): moderate rate (4× base). Agent-added |
| capacity that's serving traffic OR in the boot queue — costs more |
| but is defensible. |
| Tier 3 — Idle excess (above 'needed', active only): expensive penalty |
| rate (20× base). ACTIVE capacity sitting idle beyond what traffic |
| requires — pure waste. |
| |
| Key: PENDING capacity is always charged at Tier 2 (justified), not Tier 3 |
| (idle waste). Pending units haven't booted yet so they CAN'T serve traffic; |
| classifying them as "idle waste" penalises the agent for the boot delay |
| which it cannot control. Once they boot, they become active and are |
| reclassified as justified or idle based on actual traffic. |
| |
| 'needed' = ceil(incoming_rate / 15) — minimum ACTIVE units to serve traffic. |
| With DEFAULT_CAPACITY=3, a node at baseline costs 3 × $0.05 = $0.15/hr. |
| """ |
| total_cost = 0.0 |
| baseline_cap = int(DEFAULT_CAPACITY) |
| for node in nodes_true: |
| if node["status"] == NodeStatus.FAILED: |
| continue |
| active = int(node.get("capacity_units", 0)) |
| pending = int(node.get("pending_capacity_units", 0)) |
| capacity = active + pending |
| if capacity <= 0: |
| continue |
| incoming = float(node.get("incoming_request_rate", 0.0)) |
| needed = max(1, int(math.ceil(incoming / 15.0))) |
|
|
| |
| if active <= baseline_cap: |
| |
| total_cost += active * COST_PER_CAPACITY_UNIT_PER_HOUR |
| else: |
| |
| total_cost += baseline_cap * COST_PER_CAPACITY_UNIT_PER_HOUR |
| above_baseline = active - baseline_cap |
| justified = max(0, needed - baseline_cap) |
| idle = max(0, above_baseline - justified) |
| |
| total_cost += justified * (COST_PER_CAPACITY_UNIT_PER_HOUR * 4.0) |
| |
| total_cost += idle * OVERPROVISION_COST_PER_UNIT |
|
|
| |
| if pending > 0: |
| |
| baseline_remaining = max(0, baseline_cap - active) |
| |
| pending_at_baseline = min(pending, baseline_remaining) |
| pending_above = pending - pending_at_baseline |
| total_cost += pending_at_baseline * COST_PER_CAPACITY_UNIT_PER_HOUR |
| total_cost += pending_above * (COST_PER_CAPACITY_UNIT_PER_HOUR * 4.0) |
|
|
| return total_cost |
|
|
| def _avg_latency(self, nodes: list[dict]) -> float: |
| """Computes importance-weighted mean latency across the cluster.""" |
| if not nodes: |
| return float("inf") |
|
|
| weighted_latency = 0.0 |
| total_weight = 0.0 |
| for n in nodes: |
| weight = float(n.get("importance_weight", 1.0)) |
| raw_latency = float(n["latency_ms"]) |
| latency = min(raw_latency, MAX_LATENCY_NORM) |
| weighted_latency += weight * latency |
| total_weight += weight |
|
|
| if total_weight <= 0: |
| return float("inf") |
| return weighted_latency / total_weight |
|
|
| def _error_rate(self, nodes: list[dict]) -> float: |
| """Calculates an importance-weighted fraction of dropped or lost requests.""" |
| total_incoming = sum(float(n.get("incoming_request_rate", 0.0)) * float(n.get("importance_weight", 1.0)) for n in nodes) |
| if total_incoming <= 0: |
| return 0.0 |
| total_drops = sum(float(n.get("dropped_requests", 0.0)) * float(n.get("importance_weight", 1.0)) for n in nodes) |
| return min(1.0, total_drops / total_incoming) |
|
|
| def _vip_failure_count(self, nodes: list[dict]) -> int: |
| """Counts failed VIP nodes for reporting and diagnostics.""" |
| return sum(1 for n in nodes if n.get("is_vip") and n["status"] == NodeStatus.FAILED) |
|
|
| def _build_observation(self) -> ClusterObservation: |
| """Assembles the ClusterObservation from the current observed simulator state.""" |
| |
| prev_by_id: dict[str, dict] = {n["node_id"]: n for n in self._prev_nodes_true} |
|
|
| node_obs = [] |
| for n in self._nodes_obs: |
| |
| true_n = next((t for t in self._nodes_true if t["node_id"] == n["node_id"]), n) |
| prev_n = prev_by_id.get(n["node_id"]) |
| if prev_n: |
| queue_delta_raw = float(n["queue_depth"]) - float(prev_n.get("queue_depth", 0)) |
| queue_delta = max(-1.0, min(1.0, queue_delta_raw / MAX_QUEUE_NORM)) |
| else: |
| queue_delta = 0.0 |
|
|
| |
| |
| weight = float(n.get("importance_weight", 1.0)) |
| if prev_n: |
| prev_q = float(prev_n.get("queue_depth", 0)) |
| curr_q = float(true_n["queue_depth"]) |
| node_drift = weight * (curr_q ** 2 - prev_q ** 2) |
| node_barrier = max(0, curr_q - 150.0) ** 2 |
| node_cost = float(true_n.get("capacity_units", 0)) * COST_PER_CAPACITY_UNIT_PER_HOUR |
| node_reward_raw = -(ALPHA * node_drift + DELTA * (node_barrier / 10000.0) + BETA * node_cost) |
| |
| node_reward_val = max(-1.0, min(0.0, node_reward_raw / 10.0)) |
| else: |
| node_reward_val = 0.0 |
|
|
| |
| node_latency_norm = min(1.0, max(0.0, float(n["latency_ms"]) / MAX_LATENCY_NORM)) |
| sla_prox = max(0.0, min(1.0, node_latency_norm / 0.20)) |
|
|
| |
| node_upstreams = [ |
| pid for pid, children in CLUSTER_TOPOLOGY.items() |
| if n["node_id"] in children |
| ] |
| node_downstreams = CLUSTER_TOPOLOGY.get(n["node_id"], []) |
|
|
| |
| |
| |
| parent_queues = [ |
| min(1.0, max(0.0, float(prev_by_id.get(pid, {}).get("queue_depth", 0)) / MAX_QUEUE_NORM)) |
| for pid in node_upstreams |
| ] |
| upstream_pressure = sum(parent_queues) / len(parent_queues) if parent_queues else 0.0 |
|
|
| node_obs.append(NodeObservation( |
| node_id=n["node_id"], |
| status=n["status"], |
| queue_depth=min(1.0, max(0.0, float(n["queue_depth"]) / MAX_QUEUE_NORM)), |
| latency_ms=min(1.0, max(0.0, float(n["latency_ms"]) / MAX_LATENCY_NORM)), |
| incoming_request_rate=min(1.0, max(0.0, float(n["incoming_request_rate"]) / MAX_REQUEST_RATE_NORM)), |
| cpu_utilization=min(1.0, max(0.0, float(n["cpu_utilization"]))), |
| is_vip=bool(n.get("is_vip", False)), |
| importance_weight=float(n.get("importance_weight", 1.0)), |
| capacity=float(n.get("capacity_units", 0)) / 5.0, |
| pending_capacity=float(n.get("pending_capacity_units", 0)) / 5.0, |
| queue_delta=queue_delta, |
| sla_proximity=sla_prox, |
| outflow_rate=min(1.0, float(n.get("outflow_rate", 0.0)) / MAX_REQUEST_RATE_NORM), |
| upstream_nodes=node_upstreams, |
| downstream_nodes=node_downstreams, |
| upstream_pressure=upstream_pressure, |
| node_reward=node_reward_val, |
| done=False, |
| reward=0.0, |
| )) |
|
|
| freshness = int((time.time() - self._last_metric_time) * 1000) if self._last_metric_time > 0 else 0 |
|
|
| return ClusterObservation( |
| cluster_id=self._state.episode_id, |
| task_id=self._task_id, |
| mode=self._mode, |
| active_nodes=sum(1 for n in self._nodes_true if n["status"] != NodeStatus.FAILED), |
| average_latency_ms=min(1.0, max(0.0, self._avg_latency(self._nodes_true) / MAX_LATENCY_NORM)), |
| error_rate=self._error_rate(self._nodes_true), |
| total_queue_backlog=min(1.0, max(0.0, sum(float(n["queue_depth"]) for n in self._nodes_obs) / (N_NODES * MAX_QUEUE_NORM))), |
| current_cost_per_hour=self._compute_cost(self._nodes_true), |
| lyapunov_energy=self._prev_lyapunov, |
| nodes=node_obs, |
| step=self._state.step_count, |
| max_steps=MAX_STEPS, |
| sla_violations=self._sla_violations, |
| invalid_action_count=self._sim.invalid_action_count, |
| vip_failure_count=self._vip_failure_count(self._nodes_true), |
| metric_timestamp=self._last_metric_time, |
| data_freshness_ms=freshness, |
| action_ack_status=self._action_ack_status, |
| action_id=self._last_action_id, |
| executor_latency_ms=self._last_executor_latency_ms, |
| executor_error_code=self._last_executor_error_code, |
| raw_reward=self._last_raw_reward, |
| normalized_reward=self._last_normalized_reward, |
| reward_scale_version=REWARD_SCALE_VERSION, |
| reward_drift=self._last_reward_drift, |
| reward_cost=self._last_reward_cost, |
| reward_sla=self._last_reward_sla, |
| reward_barrier=self._last_reward_barrier, |
| choke_level=0.0, |
| done=False, |
| reward=0.0, |
| ) |
|
|
|
|