"""AtlasOps Evaluation — Base model vs Fine-tuned comparison. Runs N episodes on the live GKE cluster and reports: - Resolution rate per tier - Average reward (judge score + contract score) - MTTR distribution - Per-incident postmortem quality Usage: # Compare base vs fine-tuned python eval.py --base Qwen/Qwen2.5-7B-Instruct --ft checkpoints/grpo_v3 --episodes 20 # Eval a single model python eval.py --model checkpoints/grpo_v3 --episodes 30 --tiers cascade,named_replays # Quick smoke test (5 episodes) python eval.py --model Qwen/Qwen2.5-7B-Instruct --episodes 5 --quick """ import argparse import asyncio import json import logging import os import subprocess import time from datetime import datetime, timezone from pathlib import Path from config.runtime import EVAL_SCENARIOS_BY_TIER log = logging.getLogger(__name__) RESULTS_DIR = Path("bench/results/eval") MANIFESTS_DIR = Path("bench/chaos_manifests") def apply_chaos(scenario_id: str) -> bool: manifest = MANIFESTS_DIR / f"{scenario_id}.yaml" if not manifest.exists(): return False env = os.environ.copy() env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True" r = subprocess.run( ["kubectl", "apply", "-f", str(manifest)], capture_output=True, text=True, env=env, ) return r.returncode == 0 def reset_chaos(): env = os.environ.copy() env["USE_GKE_GCLOUD_AUTH_PLUGIN"] = "True" subprocess.run( ["kubectl", "delete", "podchaos,networkchaos,stresschaos,dnschaos,iochaos,timechaos", "--all", "-A", "--ignore-not-found=true"], capture_output=True, env=env, ) time.sleep(30) def wait_for_alert(timeout_s: int = 180) -> dict | None: from agents.tools.alertmanager import alertmanager_list_alerts deadline = time.time() + timeout_s while time.time() < deadline: result = alertmanager_list_alerts(active_only=True) if result.get("success") and result.get("count", 0) > 0: return {"commonLabels": {"alertname": result["alerts"][0]["alertname"]}, "alerts": result["alerts"]} time.sleep(10) return {"commonLabels": {"alertname": "EvalTimeout"}, "alerts": [], "synthetic": True} async def run_episode(scenario_id: str) -> dict: from agents.coordinator import handle_incident from agents.judge import judge_trajectory t0 = time.time() tier = scenario_id.split("/")[0] if not apply_chaos(scenario_id): return {"scenario_id": scenario_id, "status": "skip", "tier": tier} alert = wait_for_alert() alert["scenario_id"] = scenario_id try: incident = await handle_incident(alert) judge_score = await judge_trajectory(incident) except Exception as e: reset_chaos() return {"scenario_id": scenario_id, "status": "error", "error": str(e), "tier": tier} reset_chaos() remediation = incident.get("remediation", {}).get("final", {}) total_turns = sum( len(incident.get(r, {}).get("trajectory", [])) for r in ("triage", "diagnosis", "remediation", "comms") ) return { "scenario_id": scenario_id, "tier": tier, "status": "ok", "resolved": remediation.get("outcome") == "resolved", "outcome": remediation.get("outcome", "unknown"), "time_to_resolve_s": remediation.get("time_to_resolve_seconds", round(time.time() - t0)), "total_turns": total_turns, "judge": judge_score, "postmortem_path": incident.get("comms", {}).get("final", {}).get("postmortem_path"), } def reset_cluster(): reset_chaos() def compute_stats(results: list[dict], tag: str) -> dict: valid = [r for r in results if r.get("status") == "ok"] resolved = [r for r in valid if r.get("resolved")] judge_scores = [r["judge"].get("overall", 0) for r in valid if r.get("judge")] ttr_values = [r["time_to_resolve_s"] for r in valid if r.get("time_to_resolve_s")] per_tier: dict = {} for tier in ("single_fault", "cascade", "named_replays"): tier_eps = [r for r in valid if r.get("tier") == tier] tier_res = [r for r in tier_eps if r.get("resolved")] per_tier[tier] = { "total": len(tier_eps), "resolved": len(tier_res), "resolution_rate": round(len(tier_res) / max(len(tier_eps), 1), 3), } return { "tag": tag, "timestamp": datetime.now(timezone.utc).isoformat(), "total_episodes": len(results), "valid_episodes": len(valid), "resolution_rate": round(len(resolved) / max(len(valid), 1), 3), "avg_judge_score": round(sum(judge_scores) / max(len(judge_scores), 1), 3), "avg_ttr_seconds": round(sum(ttr_values) / max(len(ttr_values), 1), 1), "min_ttr_seconds": min(ttr_values) if ttr_values else None, "max_ttr_seconds": max(ttr_values) if ttr_values else None, "per_tier": per_tier, } def print_comparison(base_stats: dict, ft_stats: dict): print("\n" + "=" * 70) print(" ATLASOPS EVALUATION — BASE vs FINE-TUNED") print("=" * 70) print(f"\n{'Metric':<30} {'Base':>12} {'Fine-tuned':>12} {'Delta':>10}") print("-" * 66) metrics = [ ("Resolution Rate", "resolution_rate", "{:.1%}"), ("Avg Judge Score", "avg_judge_score", "{:.3f}"), ("Avg TTR (seconds)", "avg_ttr_seconds", "{:.0f}s"), ] for label, key, fmt in metrics: b = base_stats.get(key, 0) f = ft_stats.get(key, 0) delta = f - b if isinstance(b, (int, float)) else 0 sign = "+" if delta > 0 else "" print(f" {label:<28} {fmt.format(b):>12} {fmt.format(f):>12} {sign}{fmt.format(delta):>9}") print("\n Per-Tier Resolution Rate:") for tier in ("single_fault", "cascade", "named_replays"): b = base_stats.get("per_tier", {}).get(tier, {}).get("resolution_rate", 0) f = ft_stats.get("per_tier", {}).get(tier, {}).get("resolution_rate", 0) delta = f - b sign = "+" if delta > 0 else "" print(f" {tier:<26} {b:>10.1%} {f:>10.1%} {sign}{delta:>8.1%}") print("=" * 70 + "\n") async def eval_model(model_id: str, tag: str, scenarios: list[str], episodes: int) -> dict: os.environ["AGENT_MODEL"] = model_id log.info("Evaluating %s (%d episodes)...", tag, episodes) results = [] for i, scenario in enumerate(scenarios[:episodes], 1): log.info("[%d/%d] %s", i, episodes, scenario) result = await run_episode(scenario) results.append(result) return compute_stats(results, tag) async def main(): parser = argparse.ArgumentParser() parser.add_argument("--base", default="", help="Base model path/ID") parser.add_argument("--ft", default="", help="Fine-tuned checkpoint path") parser.add_argument("--model", default="", help="Single model eval (sets both)") parser.add_argument("--episodes", type=int, default=20) parser.add_argument("--tiers", default="single_fault,cascade,named_replays") parser.add_argument("--quick", action="store_true", help="5-episode smoke test") args = parser.parse_args() if args.quick: args.episodes = 5 tiers = [t.strip() for t in args.tiers.split(",")] scenarios = [] for tier in tiers: scenarios.extend(EVAL_SCENARIOS_BY_TIER.get(tier, [])) RESULTS_DIR.mkdir(parents=True, exist_ok=True) if args.model: stats = await eval_model(args.model, "model", scenarios, args.episodes) print(json.dumps(stats, indent=2)) (RESULTS_DIR / f"eval_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" ).write_text(json.dumps(stats, indent=2)) return if not args.base or not args.ft: parser.error("Provide --base and --ft for comparison, or --model for single eval") base_stats = await eval_model(args.base, "base", scenarios, args.episodes) ft_stats = await eval_model(args.ft, "fine_tuned", scenarios, args.episodes) print_comparison(base_stats, ft_stats) ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") (RESULTS_DIR / f"comparison_{ts}.json").write_text( json.dumps({"base": base_stats, "fine_tuned": ft_stats}, indent=2) ) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") asyncio.run(main())