AntiAtropos / grader.py
div18
feat(grader): add task-aware grading with new metrics and composite scoring
e012e73
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
AntiAtropos Grading Logic β€” Evaluation Engine.
Provides standard scoring for hackathon submissions across multiple dimensions:
1. Uptime: Fraction of ticks where cluster-wide SLA (latency/errors) was met.
2. Cost: Normalized efficiency score based on provisioned capacity.
3. Stability: Mean Lyapunov energy normalized to a target baseline.
4. Recovery Speed (task-2): Ticks from node failure to child queue recovery.
5. VIP Protection (task-3): Whether node-0 stayed healthy during surge.
6. Action Efficiency: Fraction of actions that had measurable effect.
"""
import math
from collections import Counter
from typing import Dict, Any, List, Optional
try:
from .models import ClusterObservation
from .simulator import CLUSTER_TOPOLOGY
from .stability import lyapunov_variance as _lyapunov_variance
except ImportError:
from models import ClusterObservation # type: ignore
from simulator import CLUSTER_TOPOLOGY # type: ignore
from stability import lyapunov_variance as _lyapunov_variance # type: ignore
# ---------------------------------------------------------------------------
# SLA thresholds (must match environment.py)
# ---------------------------------------------------------------------------
SLA_LATENCY_MS: float = 0.20 # Normalized (200ms / 1000ms)
SLA_ERROR_RATE: float = 0.05
# ---------------------------------------------------------------------------
# Cost calibration
# ---------------------------------------------------------------------------
# Baseline cost = all 10 nodes at default capacity 3 with $0.05 / capacity-unit.
# 10 * 3 * 0.05 = $1.50 / hr. This is what a perfectly provisioned agent pays.
BASELINE_COST_PER_HOUR: float = 1.50
MIN_COST_PER_HOUR: float = 0.05 # 1 active node at min capacity 1
MAX_COST_PER_HOUR: float = 25.00 # 10 nodes at ~50 capacity units (overprovisioned blow-out)
# Exponential cost penalty harshness β€” higher = steeper curve
COST_PENALTY_K: float = 3.0
# ---------------------------------------------------------------------------
# Stability normalisation
# ---------------------------------------------------------------------------
# Energy reference point for stability scoring (Task 1 baseline).
# This is a calibration midpoint, not a hard "full score" cutoff.
TARGET_ENERGY: float = 2000.0
# Curvature for stability scoring:
# score = 1 / (1 + (avg_energy / TARGET_ENERGY)^STABILITY_CURVE_POWER)
STABILITY_CURVE_POWER: float = 2.0
# ---------------------------------------------------------------------------
# Recovery speed (task-2) β€” normalized queue depth threshold [0, 1]
# ---------------------------------------------------------------------------
RECOVERY_QUEUE_CLEAR_THRESHOLD: float = 0.10
RECOVERY_SPEED_CAP: int = 10 # ticks; recovery_score = max(0, 1 - speed/CAP)
# ---------------------------------------------------------------------------
# VIP protection (task-3) β€” surge detection threshold
# ---------------------------------------------------------------------------
SURGE_INCOMING_THRESHOLD: float = 0.60 # normalized incoming_request_rate > 0.6
# ---------------------------------------------------------------------------
# Action record type
# ---------------------------------------------------------------------------
class ActionRecord:
"""A single action taken by the agent during an episode."""
__slots__ = ("action_type", "target_node_id", "parameter", "had_effect")
def __init__(self, action_type: str, target_node_id: str,
parameter: float, had_effect: bool):
self.action_type = action_type
self.target_node_id = target_node_id
self.parameter = parameter
self.had_effect = had_effect
# ---------------------------------------------------------------------------
# Grade with task-aware composite
# ---------------------------------------------------------------------------
class Grade:
# Task-specific weight profiles for composite computation
TASK_WEIGHTS: Dict[str, Dict[str, float]] = {
"task-1": {"uptime": 0.4, "stability": 0.2, "cost": 0.4},
"task-2": {"uptime": 0.25, "stability": 0.15, "cost": 0.25, "recovery": 0.35},
"task-3": {"uptime": 0.35, "stability": 0.15, "cost_weighted": 0.35, "vip_protection": 0.15},
}
def __init__(self, task_id: str, scores: Dict[str, float]):
self.task_id = task_id
self.scores = scores
@property
def composite(self) -> float:
"""
Weighted composite score using task-specific weight profiles.
Task-1: 0.4*uptime + 0.2*stability + 0.4*cost
Task-2: 0.25*uptime + 0.15*stability + 0.25*cost + 0.35*recovery
(falls back to task-1 weights if recovery_speed is NaN)
Task-3: 0.35*uptime + 0.15*stability + 0.35*cost_weighted + 0.15*vip_protection
(cost_weighted = cost if uptime >= 0.5 else 0.0)
Additional modifiers:
- Invalid Action Penalty: -0.05 per forbidden command
- Episode bonuses: +0.10 if zero VIP failures, +0.05 if <3 SLA violations,
+0.05 if no invalid actions
"""
uptime = self.scores["uptime"]
stability = self.scores["stability"]
cost = self.scores["cost"]
invalid_penalty = self.scores.get("invalid_actions", 0) * 0.05
# Episode-level prevention bonuses (NOT in step reward to avoid double-counting)
bonus = 0.0
if self.scores.get("vip_failure_count", 0) == 0:
bonus += 0.10 # Zero VIP failures all episode
if self.scores.get("violations", 0) < 3:
bonus += 0.05 # Very few SLA violations all episode
if self.scores.get("invalid_actions", 0) == 0:
bonus += 0.05 # Clean actions all episode
# Select weight profile by task
weights = self.TASK_WEIGHTS.get(self.task_id, self.TASK_WEIGHTS["task-1"])
if self.task_id == "task-2":
recovery_speed = self.scores.get("recovery_speed")
if recovery_speed is not None and not math.isnan(recovery_speed):
recovery_score = max(0.0, 1.0 - recovery_speed / RECOVERY_SPEED_CAP)
score = (
weights.get("uptime", 0.25) * uptime
+ weights.get("stability", 0.15) * stability
+ weights.get("cost", 0.25) * cost
+ weights.get("recovery", 0.35) * recovery_score
)
else:
# Fallback: no failure triggered this seed, use task-1 weights
score = 0.4 * uptime + 0.2 * stability + 0.4 * cost
elif self.task_id == "task-3":
cost_weight = 1.0 if uptime >= 0.5 else 0.0
cost_weighted = cost * cost_weight
vip_protection = self.scores.get("vip_protection", 0.0)
score = (
weights.get("uptime", 0.35) * uptime
+ weights.get("stability", 0.15) * stability
+ weights.get("cost_weighted", 0.35) * cost_weighted
+ weights.get("vip_protection", 0.15) * vip_protection
)
else:
score = (
weights.get("uptime", 0.4) * uptime
+ weights.get("stability", 0.2) * stability
+ weights.get("cost", 0.4) * cost
)
return max(0.0, min(1.0, score - invalid_penalty + bonus))
def summary(self) -> str:
s = self.scores
parts = [
f"[{self.task_id}] composite={self.composite:.3f}",
f"uptime={s['uptime']:.3f}",
f"cost={s['cost']:.3f}",
f"stability={s['stability']:.3f}",
f"SLA_violations={int(s['violations'])}",
]
if s.get("invalid_actions", 0) > 0:
parts.append(f"INVALID={int(s['invalid_actions'])}")
if "recovery_speed" in s and s["recovery_speed"] is not None and not math.isnan(s["recovery_speed"]):
parts.append(f"recovery={s['recovery_speed']:.0f}ticks")
if "vip_protection" in s:
parts.append(f"vip_prot={s['vip_protection']:.1f}")
if "action_efficiency" in s:
parts.append(f"eff={s['action_efficiency']:.2f}")
return " | ".join(parts)
# ---------------------------------------------------------------------------
# Episode Grader β€” collects observations + actions, computes Grade
# ---------------------------------------------------------------------------
class EpisodeGrader:
"""Consumes observations and actions from an environment episode to produce a grade."""
def __init__(self, task_id: str = "task-1"):
self.task_id = task_id
self._records: List[Dict[str, Any]] = []
self._action_records: List[ActionRecord] = []
self._lyapunov_history: List[float] = []
def record(self, observation: ClusterObservation) -> None:
"""Add a step's telemetry to the grading buffer."""
obs_dict = observation.model_dump()
self._records.append(obs_dict)
lyap_val = obs_dict.get("lyapunov_energy", 0.0)
self._lyapunov_history.append(float(lyap_val))
def record_action(self, action_type: str, target_node_id: str,
parameter: float, had_effect: bool) -> None:
"""Record an action taken by the agent for efficiency analysis."""
self._action_records.append(
ActionRecord(action_type, target_node_id, parameter, had_effect)
)
# ── New metric computation methods ─────────────────────────────────────
def _compute_action_efficiency(self) -> float:
"""Fraction of actions that had measurable effect. Range [0, 1]."""
if not self._action_records:
return 1.0 # No actions = trivially efficient
effective = sum(1 for a in self._action_records if a.had_effect)
return effective / len(self._action_records)
def _compute_action_distribution(self) -> Dict[str, int]:
"""Count of each ActionType across the episode."""
return dict(Counter(a.action_type for a in self._action_records))
def _compute_node_heatmap(self) -> Dict[str, Dict[str, int]]:
"""Count of actions per node, grouped by action type."""
heatmap: Dict[str, Dict[str, int]] = {}
for a in self._action_records:
by_type = heatmap.setdefault(a.action_type, {})
by_type[a.target_node_id] = by_type.get(a.target_node_id, 0) + 1
return heatmap
def _compute_lyapunov_variance(self) -> float:
"""Variance of Lyapunov energy across the episode."""
if len(self._lyapunov_history) < 2:
return 0.0
return _lyapunov_variance(self._lyapunov_history)
def _compute_recovery_speed(self) -> Optional[float]:
"""
Ticks from first FAILED node to when all its children have
queue_depth < RECOVERY_QUEUE_CLEAR_THRESHOLD (normalized).
Returns None if no failure occurred (NaN sentinel).
"""
if self.task_id != "task-2":
return None
# Find first tick with a FAILED node
t_fail: Optional[int] = None
failed_node_id: Optional[str] = None
for tick_idx, rec in enumerate(self._records):
for node in rec.get("nodes", []):
status = str(node.get("status", ""))
if status == "FAILED":
t_fail = tick_idx
failed_node_id = node.get("node_id")
break
if t_fail is not None:
break
if t_fail is None or failed_node_id is None:
return float("nan") # No failure in this seed
# Get children of the failed node from DAG topology
children = CLUSTER_TOPOLOGY.get(failed_node_id, [])
if not children:
# Leaf node failed β€” no children to starve, recovery = immediate
return 0.0
# Find first tick after failure where ALL children have cleared queues
for tick_idx in range(t_fail, len(self._records)):
rec = self._records[tick_idx]
all_clear = True
for node in rec.get("nodes", []):
if node.get("node_id") in children:
if float(node.get("queue_depth", 1.0)) >= RECOVERY_QUEUE_CLEAR_THRESHOLD:
all_clear = False
break
if all_clear:
return float(tick_idx - t_fail)
# Never recovered within the episode
return float(len(self._records) - t_fail)
def _compute_cost_trajectory(self) -> float:
"""
Linear regression slope of cost over time.
Negative = agent reduced cost (good). Positive = cost climbing.
"""
costs = [r.get("current_cost_per_hour", 0.0) for r in self._records]
n = len(costs)
if n < 2:
return 0.0
mean_t = (n - 1) / 2.0
mean_c = sum(costs) / n
cov = sum((i - mean_t) * (costs[i] - mean_c) for i in range(n))
var_t = sum((i - mean_t) ** 2 for i in range(n))
if var_t == 0:
return 0.0
return cov / var_t
def _compute_peak_queue_sum(self) -> float:
"""Maximum total_queue_backlog observed across the episode."""
return max((r.get("total_queue_backlog", 0.0) for r in self._records), default=0.0)
def _compute_vip_protection(self) -> float:
"""
Task-3 only: 1.0 if node-0 never hit FAILED or DEGRADED during
the surge window, else 0.0.
The task-3 surge adds ~60 req/tick directly to node-1 and node-2
via a side channel that bypasses node-0 (simulator direct_injections).
Node-0's own incoming_request_rate stays ~0.30 β€” well below any
threshold β€” so we detect the surge window from the nodes that
actually receive it (node-1, node-2) instead.
"""
if self.task_id != "task-3":
return 0.0
for rec in self._records:
# Detect surge window: node-1 or node-2 has elevated incoming
surge_active = False
for node in rec.get("nodes", []):
nid = node.get("node_id", "")
incoming = float(node.get("incoming_request_rate", 0.0))
if nid in ("node-1", "node-2") and incoming > SURGE_INCOMING_THRESHOLD:
surge_active = True
break
if not surge_active:
continue
# During surge: check if node-0 is unhealthy
for node in rec.get("nodes", []):
if node.get("node_id") == "node-0":
status = str(node.get("status", ""))
if status in ("FAILED", "DEGRADED"):
return 0.0
break
return 1.0
# ── Main scoring method ────────────────────────────────────────────────
def score(self) -> Grade:
"""Computes the final multi-dimensional performance grade."""
if not self._records:
return Grade(self.task_id, {
"uptime": 0, "cost": 0, "stability": 0, "violations": 0
})
n = len(self._records)
# ── 1. Uptime score ────────────────────────────────────────────────
# Note: We exclude the t=0 state from uptime if n > 1.
records_to_count = self._records[1:] if len(self._records) > 1 else self._records
n_steps = len(records_to_count)
sla_ok_steps = sum(
1 for r in records_to_count
if r.get("average_latency_ms", 0.0) <= SLA_LATENCY_MS
and r.get("error_rate", 0.0) <= SLA_ERROR_RATE
)
uptime_score = sla_ok_steps / n_steps
# Total cumulative SLA violations (use the last record's counter
# since environment.py tracks this cumulatively)
total_violations = self._records[-1].get("sla_violations", 0)
# ── 2. Cost score ──────────────────────────────────────────────────
# Computes efficiency relative to a 'perfectly provisioned' system.
avg_cost = sum(r.get("current_cost_per_hour", 0.0) for r in self._records) / n
# Exponential cost penalty: cost_score = exp(-k * over_provisioning_ratio)
# over_provisioning_ratio = (avg_cost - BASELINE) / BASELINE
# A perfectly provisioned agent (avg_cost == BASELINE) scores exp(0) = 1.0.
# An agent that doubles the baseline (massive SCALE_UP spam) scores
# exp(-3.0) β‰ˆ 0.05 β€” nearly zero cost contribution.
over_ratio = max(0.0, (avg_cost - BASELINE_COST_PER_HOUR) / BASELINE_COST_PER_HOUR)
cost_score = max(0.0, min(1.0, math.exp(-COST_PENALTY_K * over_ratio)))
# ── 3. Stability score ─────────────────────────────────────────────
# Smooth inverse-energy score with no early saturation.
# Avoids flattening diverse "good" policies into a perfect 1.0 bucket.
avg_energy = sum(r.get("lyapunov_energy", 0.0) for r in self._records) / n
if avg_energy <= 0:
stability_score = 1.0
else:
ratio = avg_energy / TARGET_ENERGY
stability_score = 1.0 / (1.0 + (ratio ** STABILITY_CURVE_POWER))
# ── 4. Invalid Action tracking ──────────────────────────────────────
total_invalid = self._records[-1].get("invalid_action_count", 0)
total_vip_failures = self._records[-1].get("vip_failure_count", 0)
# ── 5. New episode-level metrics ────────────────────────────────────
recovery_speed = self._compute_recovery_speed()
vip_protection = self._compute_vip_protection()
action_efficiency = self._compute_action_efficiency()
action_distribution = self._compute_action_distribution()
node_heatmap = self._compute_node_heatmap()
lyap_var = self._compute_lyapunov_variance()
cost_trajectory = self._compute_cost_trajectory()
peak_queue = self._compute_peak_queue_sum()
scores: Dict[str, float] = {
"uptime": uptime_score,
"cost": cost_score,
"stability": stability_score,
"violations": total_violations,
"invalid_actions": total_invalid,
"vip_failure_count": total_vip_failures,
"action_efficiency": action_efficiency,
"lyapunov_variance": lyap_var,
"cost_trajectory": cost_trajectory,
"peak_queue_sum": peak_queue,
}
# Only include recovery_speed for task-2 (NaN-safe)
if recovery_speed is not None:
scores["recovery_speed"] = recovery_speed
# Only include vip_protection for task-3
if self.task_id == "task-3":
scores["vip_protection"] = vip_protection
# Action distribution and node heatmap as non-float metadata
scores["action_distribution"] = action_distribution # type: ignore[assignment]
scores["node_heatmap"] = node_heatmap # type: ignore[assignment]
return Grade(self.task_id, scores)
def score_episode(task_id: str, observations: List[ClusterObservation]) -> Grade:
"""Helper for one-shot grading."""
grader = EpisodeGrader(task_id)
for obs in observations:
grader.record(obs)
return grader.score()