| """ |
| AntiAtropos Stability Layer — Phase 3. |
| |
| This module is the mathematical core of the Lyapunov-inspired reward signal. |
| It is intentionally a stateless collection of pure functions so it can be |
| swapped, extended, or unit-tested independently of the environment. |
| |
| Key concepts implemented |
| ------------------------ |
| 1. Lyapunov Energy V(s) = Σ Q_i² |
| The "potential energy" of the cluster. Zero means all queues are empty. |
| A rising V means the cluster is destabilising. |
| |
| 2. Lyapunov Drift ΔV(t) = V(s_t) − V(s_{t−1}) |
| The primary stabilising objective. The reward penalises positive drift. |
| Negative drift (energy decreasing) is "good" — the agent drove the system |
| toward the equilibrium. |
| |
| 3. Control-Barrier Function h_i(s) = max(0, Q_i − Q_max)² |
| A soft safety constraint. h_i > 0 only when node i has exceeded the |
| hard-queue-depth safety ceiling Q_max. Summing over all nodes gives the |
| total barrier violation penalty. |
| |
| 4. Neely Drift-Plus-Penalty (optional, advanced) |
| From Neely's Lyapunov optimisation framework: |
| |
| Δ(t) + V_weight · p(t) |
| |
| where: |
| Δ(t) = ΔV — the one-step Lyapunov drift. |
| p(t) = cost — the per-step penalty (infrastructure cost here). |
| V_weight — trade-off parameter: large V_weight → prioritise cost; |
| small V_weight → prioritise stability. |
| |
| Minimising this expression at every step produces a policy that is both |
| stable (queue-stable in the mean) and cost-efficient. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import math |
| import statistics |
| from typing import Sequence |
|
|
|
|
| |
| |
| |
|
|
| Q_BARRIER_MAX: float = 150.0 |
| """Queue depth above which the barrier function fires (hard safety zone). |
| Set higher than OVERLOAD_THRESHOLD (80) to allow the agent time to react |
| before the barrier penalty kicks in.""" |
|
|
| MAX_QUEUE_NORM: float = 200.0 |
| """Normalization divisor shared with environment.py. Matches FATAL_FAIL_THRESHOLD.""" |
|
|
| BARRIER_NORM_SCALE: float = 10000.0 |
| """Normalization divisor for the barrier term. |
| The raw barrier H(s) = sum(max(0, Q_i - Q_max)^2) can produce very large values |
| (e.g. 5 nodes at Q=200, Q_max=150 gives 5*2500=12500). Without normalization, |
| this dominates the reward. Dividing by this scale keeps barrier in the same |
| order of magnitude as the other terms when delta=0.005.""" |
|
|
| STABILITY_WINDOW: int = 10 |
| """Number of ticks to look back when judging whether the system is |
| trend-stable (V is on a decreasing trajectory).""" |
|
|
|
|
| |
| |
| |
|
|
| REWARD_NORM_MIDPOINT: float = float(os.getenv("ANTIATROPOS_REWARD_MIDPOINT", "-6.0")) |
| REWARD_NORM_TEMPERATURE: float = float(os.getenv("ANTIATROPOS_REWARD_TEMPERATURE", "2.0")) |
| REWARD_NORM_EPS: float = float(os.getenv("ANTIATROPOS_REWARD_EPS", "1e-8")) |
| REWARD_SCALE_VERSION: str = "sigmoid-v2" |
|
|
|
|
| |
| |
| |
|
|
| def compute_lyapunov(nodes: list[dict]) -> float: |
| """ |
| V(s) = Σ Q_i² |
| |
| Sum of squared queue depths across all nodes. This is the cluster's |
| Lyapunov energy. Lower is more stable; zero means all queues are empty. |
| |
| Args: |
| nodes: List of per-node state dicts (as returned by simulator.state()). |
| Each dict must contain the key ``queue_depth``. |
| |
| Returns: |
| Scalar Lyapunov energy ≥ 0. |
| """ |
| return float( |
| sum( |
| float(n.get("importance_weight", 1.0)) * (n["queue_depth"] ** 2) |
| for n in nodes |
| ) |
| ) |
|
|
| def compute_lyapunov_graph( |
| nodes: list[dict], |
| topology: dict[str, list[str]], |
| edge_weight: float = 5.0, |
| ) -> float: |
| """ |
| V_graph(s) = Σ w_i·Q_i² + edge_weight · Σ_{(i,j)∈edges} |Q_i - Q_j| |
| |
| The edge term penalises flow imbalance between connected nodes. |
| If node-0 is overloaded but node-1 (its child) is idle, the edge |
| term fires even if node-1's individual energy contribution is zero. |
| This gives the agent gradient signal to balance load across the graph, |
| not just reduce individual node queues. |
| |
| With edge_weight=5.0 and max |Q_diff|=200 on 4 edges, the edge term |
| adds ~4000 to the Lyapunov energy, or ~2-5 % of the base energy at |
| full overload — a meaningful secondary gradient without dominating. |
| """ |
| node_map = {n["node_id"]: n for n in nodes} |
| |
| |
| base_energy = compute_lyapunov(nodes) |
| |
| |
| edge_penalty = 0.0 |
| for parent_id, children in topology.items(): |
| parent = node_map.get(parent_id) |
| if not parent: |
| continue |
| for child_id in children: |
| child = node_map.get(child_id) |
| if not child: |
| continue |
| imbalance = abs(float(parent["queue_depth"]) - float(child["queue_depth"])) |
| edge_penalty += imbalance |
|
|
| return base_energy + edge_weight * edge_penalty |
|
|
|
|
| def compute_drift(v_prev: float, v_curr: float) -> float: |
| """ |
| ΔV(t) = V(s_t) − V(s_{t−1}) |
| |
| One-step drift in Lyapunov energy. |
| |
| Negative drift → the agent moved the system toward a lower-energy state. |
| Positive drift → the cluster is destabilising. |
| |
| Args: |
| v_prev: Lyapunov energy at the *previous* tick. |
| v_curr: Lyapunov energy at the *current* tick. |
| |
| Returns: |
| Signed scalar drift value. |
| """ |
| return v_curr - v_prev |
|
|
|
|
| def compute_barrier(nodes: list[dict], q_max: float = Q_BARRIER_MAX) -> float: |
| """ |
| Control-Barrier Function (CBF) violation penalty. |
| |
| H(s) = Σ_i max(0, Q_i − Q_max)² |
| |
| This is zero when no node exceeds the ceiling and grows quadratically as |
| queues enter the "hard danger zone" above Q_max. It can be added to the |
| reward as an extra penalty for unsafe states. |
| |
| Args: |
| nodes: Per-node state dicts (must contain ``queue_depth``). |
| q_max: Safety ceiling for queue depth. Default: Q_BARRIER_MAX. |
| |
| Returns: |
| Scalar barrier violation energy ≥ 0. |
| """ |
| violation = 0.0 |
| for n in nodes: |
| excess = n["queue_depth"] - q_max |
| if excess > 0: |
| violation += excess ** 2 |
| return violation |
|
|
|
|
| |
| |
| |
|
|
| def is_lyapunov_stable( |
| v_history: Sequence[float], |
| window: int = STABILITY_WINDOW, |
| ) -> bool: |
| """ |
| Return True if the Lyapunov energy has been on a non-increasing trend |
| over the last ``window`` ticks. |
| |
| Uses a simple linear regression slope: if slope ≤ 0 the system is |
| considered trend-stable. |
| |
| Args: |
| v_history: Ordered sequence of Lyapunov energy values (oldest first). |
| window: How many recent values to consider. |
| |
| Returns: |
| True if the system is trend-stable, False otherwise. |
| """ |
| recent = list(v_history[-window:]) |
| if len(recent) < 2: |
| return True |
|
|
| n = len(recent) |
| xs = list(range(n)) |
| mean_x = (n - 1) / 2.0 |
| mean_y = statistics.mean(recent) |
|
|
| num = sum((xs[i] - mean_x) * (recent[i] - mean_y) for i in range(n)) |
| den = sum((xs[i] - mean_x) ** 2 for i in range(n)) |
|
|
| if den == 0: |
| return True |
|
|
| slope = num / den |
| return slope <= 0.0 |
|
|
|
|
| def lyapunov_variance(v_history: Sequence[float]) -> float: |
| """ |
| Variance of the Lyapunov energy trajectory over an episode. |
| |
| Used by the grader as the primary stability metric: a lower variance |
| means the agent kept the cluster in a consistently stable state, rather |
| than allowing wild oscillations. |
| |
| Args: |
| v_history: All per-tick V(s) values for the episode. |
| |
| Returns: |
| Population variance of the energy trajectory. |
| """ |
| if len(v_history) < 2: |
| return 0.0 |
| return statistics.variance(v_history) |
|
|
|
|
| |
| |
| |
|
|
| def drift_plus_penalty( |
| v_prev: float, |
| v_curr: float, |
| penalty_cost: float, |
| V_weight: float = 1.0, |
| ) -> float: |
| """ |
| Neely's Drift-Plus-Penalty objective: |
| |
| DPP(t) = ΔV(t) + V_weight · p(t) |
| |
| where: |
| ΔV(t) = v_curr − v_prev (Lyapunov drift) |
| p(t) = penalty_cost (infrastructure cost this tick) |
| V_weight = trade-off coefficient: |
| large → agent optimises cost more aggressively, |
| small → agent focuses on stability. |
| |
| Minimising this at each step produces a queue-stable policy with bounded |
| average cost — the theoretical guarantee from Neely's framework. |
| |
| This function can substitute for the simpler ΔV term in the reward |
| when you want to make the cost trade-off explicit and theoretically |
| grounded (rather than the ad-hoc β·Cost term). |
| |
| Args: |
| v_prev: Lyapunov energy at previous tick. |
| v_curr: Lyapunov energy at current tick. |
| penalty_cost: Per-step cost to penalise (e.g. current_cost_per_hour). |
| V_weight: Trade-off weight V in Neely's framework. |
| |
| Returns: |
| Scalar DPP value. The reward should negate this: |
| R_t = −DPP(t) − γ·SLA_violation_step |
| """ |
| delta_v = compute_drift(v_prev, v_curr) |
| return delta_v + V_weight * penalty_cost |
|
|
|
|
| |
| |
| |
|
|
| def smooth_sla_penalty( |
| avg_latency_norm: float, |
| error_rate: float, |
| latency_threshold: float = 0.20, |
| error_threshold: float = 0.05, |
| latency_temperature: float = 0.03, |
| error_temperature: float = 0.01, |
| ) -> float: |
| """ |
| Smooth SLA penalty in [0, 1] that ramps up as metrics approach thresholds. |
| |
| Unlike the binary cliff (0 or 1), this gives the agent gradient signal |
| BEFORE the SLA is actually violated, enabling preventive learning. |
| |
| Uses two sigmoids (one for latency, one for errors) and takes the max |
| so whichever dimension is worse dominates. |
| |
| Args: |
| avg_latency_norm: Normalized average latency [0, 1]. |
| error_rate: Cluster-wide error rate [0, 1]. |
| latency_threshold: Normalized latency SLA boundary. |
| error_threshold: Error rate SLA boundary. |
| latency_temperature: Sigmoid temperature for latency (lower = sharper). |
| error_temperature: Sigmoid temperature for errors (lower = sharper). |
| |
| Returns: |
| Smooth penalty in [0, 1]. Near 0 when safe, near 1 when violating. |
| |
| Raises: |
| ValueError: If inputs are outside [0, 1], indicating raw (non-normalized) |
| values were passed by mistake. This is a common bug: passing latency |
| in raw ms (e.g. 200.0) instead of normalized [0,1] (e.g. 0.20). |
| """ |
| if avg_latency_norm < -0.01 or avg_latency_norm > 1.5: |
| raise ValueError( |
| f"smooth_sla_penalty: avg_latency_norm={avg_latency_norm:.4f} is outside " |
| f"expected [0, 1] range. Did you pass raw ms instead of normalized? " |
| f"Divide by MAX_LATENCY_NORM before calling." |
| ) |
| if error_rate < -0.01 or error_rate > 1.5: |
| raise ValueError( |
| f"smooth_sla_penalty: error_rate={error_rate:.4f} is outside " |
| f"expected [0, 1] range." |
| ) |
| lat_z = (avg_latency_norm - latency_threshold) / max(1e-8, latency_temperature) |
| err_z = (error_rate - error_threshold) / max(1e-8, error_temperature) |
| lat_penalty = 1.0 / (1.0 + math.exp(-lat_z)) |
| err_penalty = 1.0 / (1.0 + math.exp(-err_z)) |
| return max(lat_penalty, err_penalty) |
|
|
|
|
| def compute_reward( |
| v_prev: float, |
| v_curr: float, |
| cost: float, |
| sla_violation_step: float = 0.0, |
| alpha: float = 1.0, |
| beta: float = 0.05, |
| gamma: float = 2.0, |
| barrier: float = 0.0, |
| delta: float = 0.005, |
| ) -> float: |
| """ |
| R_t = -(alpha * DeltaV(s) + beta * Cost + gamma * SLA_smooth + delta * Barrier) |
| |
| Convenience wrapper that mirrors the reward formula in environment.py. |
| |
| Args: |
| v_prev: Lyapunov energy at previous tick. |
| v_curr: Lyapunov energy at current tick. |
| cost: Infrastructure cost this tick (USD/hr). |
| sla_violation_step: Smooth SLA penalty in [0, 1] (was binary 0/1). |
| alpha: Weight on Lyapunov drift. |
| beta: Weight on cost. |
| gamma: Weight on SLA violations. |
| barrier: Control-barrier function violation energy. |
| delta: Weight on barrier penalty. |
| |
| Returns: |
| Scalar reward (higher is better, always <= 0 in a stable episode). |
| """ |
| delta_v = compute_drift(v_prev, v_curr) |
| |
| |
| |
| barrier_normalized = barrier / BARRIER_NORM_SCALE if BARRIER_NORM_SCALE > 0 else barrier |
| return -(alpha * delta_v + beta * cost + gamma * sla_violation_step + delta * barrier_normalized) |
|
|
|
|
| def normalize_reward( |
| raw_reward: float, |
| midpoint: float = REWARD_NORM_MIDPOINT, |
| temperature: float = REWARD_NORM_TEMPERATURE, |
| eps: float = REWARD_NORM_EPS, |
| ) -> float: |
| """ |
| Deterministically map raw reward to [0, 1] using a smooth sigmoid. |
| |
| reward_01 = 1 / (1 + exp(-(raw_reward - midpoint) / temperature)) |
| """ |
| temp = max(float(eps), abs(float(temperature))) |
| z = (float(raw_reward) - float(midpoint)) / temp |
| if z >= 0: |
| exp_neg = math.exp(-z) |
| out = 1.0 / (1.0 + exp_neg) |
| else: |
| exp_pos = math.exp(z) |
| out = exp_pos / (1.0 + exp_pos) |
| return min(1.0, max(0.0, float(out))) |
|
|