from __future__ import annotations import requests import json import statistics import math import os import random import time from typing import Any from datetime import datetime BASE = "http://localhost:7860" SEEDS = list(range(50)) TASKS = ["easy", "medium", "hard", "cascade", "fleet_coordination", "black_swan"] RESULTS_DIR = "results" MAX_STEPS_RANDOM = 15 PATCH_FILES = [ "model/transformer.py", "model/attention.py", "model/feedforward.py", "model/embedding.py", ] def _error_episode(task_id: str, seed: int, error: str) -> dict[str, Any]: return { "task_id": task_id, "seed": seed, "score": 0.01, "passed": False, "steps_taken": 0, "total_reward": 0.0, "mean_reward": 0.0, "token_efficiency_score": 0.0, "mer_score": 0.0, "time_to_solve": None, "failure_mode": "error", "breakdown": {}, "error": error, } def _post(path: str, payload: dict[str, Any], timeout: int = 15) -> dict[str, Any]: response = requests.post(f"{BASE}{path}", json=payload, timeout=timeout) response.raise_for_status() return response.json() def _grade(task_id: str) -> dict[str, Any]: return _post("/grade", {"task_id": task_id}, timeout=20) def _extract_failing_node(obs: dict[str, Any]) -> int: nodes = obs.get("nodes", []) for node in nodes: if node.get("health_status") == "failed": node_id = node.get("node_id") if isinstance(node_id, int): return node_id return 0 def _apply_step( action: dict[str, Any], step_rewards: list[float], last_token_eff: float, steps_taken: int, first_recovery_step: int | None, ) -> tuple[dict[str, Any], list[float], float, int, int | None]: step_result = _post("/step", action, timeout=20) reward_obj = step_result.get("reward", {}) reward_value = float(reward_obj.get("value", 0.0)) token_efficiency_score = float(reward_obj.get("token_efficiency_score", 0.0)) steps_taken += 1 step_rewards.append(reward_value) last_token_eff = token_efficiency_score observation = step_result.get("observation", {}) job_status = str(observation.get("training", {}).get("job_status", "")) if first_recovery_step is None and job_status == "recovered": first_recovery_step = steps_taken return step_result, step_rewards, last_token_eff, steps_taken, first_recovery_step def run_oracle_episode(task_id: str, seed: int) -> dict[str, Any]: """ Run a perfect oracle agent for given task and seed. Returns full episode metrics. """ try: obs = _post("/reset", {"task_id": task_id, "seed": seed}, timeout=20) steps_taken = 0 step_rewards: list[float] = [] last_token_efficiency = 0.0 first_recovery_step: int | None = None if task_id == "easy": failing_node = _extract_failing_node(obs) action = { "action_type": "inspect_flight_recorder", "parameters": {"rank_id": failing_node}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) elif task_id == "medium": action = { "action_type": "topo_reorder", "parameters": {"affinity": "rack"}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) for _ in range(6): action = {"action_type": "noop", "parameters": {}} _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) elif task_id == "hard": action = { "action_type": "query_nccl_logs", "parameters": {"time_window": 5}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) action = { "action_type": "inspect_flight_recorder", "parameters": {"rank_id": 0}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) selected_file = PATCH_FILES[0] for file_name in PATCH_FILES: action = { "action_type": "patch_divergent_code", "parameters": { "file": file_name, "fix_type": "identify_file", }, } step_result, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) reward_info = str(step_result.get("reward", {}).get("info", "")).lower() if "stage 1" in reward_info or "stage 2" in reward_info or "stage 3" in reward_info: selected_file = file_name break action = { "action_type": "patch_divergent_code", "parameters": { "file": selected_file, "fix_type": "propose_diff", }, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) action = { "action_type": "patch_divergent_code", "parameters": { "file": selected_file, "fix_type": "synchronize_conditional", }, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) elif task_id == "cascade": failing_node = _extract_failing_node(obs) action = { "action_type": "inspect_flight_recorder", "parameters": {"rank_id": failing_node}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) action = { "action_type": "topo_reorder", "parameters": {"affinity": "rack"}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) action = { "action_type": "query_nccl_logs", "parameters": {"time_window": 5}, } _, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) for file_name in PATCH_FILES: action = { "action_type": "patch_divergent_code", "parameters": { "file": file_name, "fix_type": "synchronize_conditional", }, } step_result, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) if float(step_result.get("reward", {}).get("value", 0.0)) > 0.1: break elif task_id == "fleet_coordination": failing_node = _extract_failing_node(obs) _post( "/delegate", { "worker": "log_inspector", "action": "inspect_flight_recorder", "parameters": {"rank_id": failing_node}, "supervisor_reasoning": "Confirm root-cause rank before remediation.", }, timeout=20, ) _post( "/delegate", { "worker": "version_checker", "action": "check_nccl_version", "parameters": {}, "supervisor_reasoning": "Confirm NCCL version before coalition action.", }, timeout=20, ) _post( "/coalition", { "proposing_worker": "topo_agent", "supporting_worker": "version_checker", "action": "topology_version_fix", "parameters": {}, "rationale": "Topology and version mismatch must be fixed jointly.", }, timeout=20, ) steps_taken = 3 first_recovery_step = 3 elif task_id == "black_swan": # Experimental stress task is intentionally unsolved by the short oracle. pass else: raise ValueError(f"Unsupported task_id: {task_id}") grade = _grade(task_id) score = float(grade.get("score", 0.01)) passed = bool(grade.get("passed", score >= 0.5)) breakdown = grade.get("breakdown", {}) or {} mer_score = float(breakdown.get("mer_score", 0.0)) explanation = str(grade.get("explanation", "")) total_reward = float(sum(step_rewards)) mean_reward = float(total_reward / len(step_rewards)) if step_rewards else 0.0 return { "task_id": task_id, "seed": seed, "score": score, "passed": passed, "steps_taken": steps_taken, "total_reward": total_reward, "mean_reward": mean_reward, "token_efficiency_score": float(last_token_efficiency), "mer_score": mer_score, "time_to_solve": first_recovery_step, "failure_mode": explanation, "breakdown": breakdown, } except Exception as exc: return _error_episode(task_id=task_id, seed=seed, error=str(exc)) def run_random_episode(task_id: str, seed: int) -> dict[str, Any]: """ Run a random agent (picks random valid action each step). Used as lower baseline floor. """ try: _post("/reset", {"task_id": task_id, "seed": seed}, timeout=20) rng = random.Random(seed) actions_pool = [ "inspect_flight_recorder", "query_nccl_logs", "topo_reorder", "patch_divergent_code", "noop", ] step_rewards: list[float] = [] last_token_efficiency = 0.0 steps_taken = 0 first_recovery_step: int | None = None for _ in range(MAX_STEPS_RANDOM): action_type = rng.choice(actions_pool) if action_type == "inspect_flight_recorder": action = { "action_type": action_type, "parameters": {"rank_id": rng.randint(0, 7)}, } elif action_type == "query_nccl_logs": action = { "action_type": action_type, "parameters": {"time_window": rng.randint(3, 10)}, } elif action_type == "topo_reorder": action = { "action_type": action_type, "parameters": {"affinity": rng.choice(["rack", "node"])}, } elif action_type == "patch_divergent_code": action = { "action_type": action_type, "parameters": { "file": rng.choice(PATCH_FILES), "fix_type": "synchronize_conditional", }, } else: action = {"action_type": "noop", "parameters": {}} step_result, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step( action, step_rewards, last_token_efficiency, steps_taken, first_recovery_step, ) if bool(step_result.get("done", False)): break grade = _grade(task_id) score = float(grade.get("score", 0.01)) passed = bool(grade.get("passed", score >= 0.5)) breakdown = grade.get("breakdown", {}) or {} mer_score = float(breakdown.get("mer_score", 0.0)) explanation = str(grade.get("explanation", "")) total_reward = float(sum(step_rewards)) mean_reward = float(total_reward / len(step_rewards)) if step_rewards else 0.0 return { "task_id": task_id, "seed": seed, "score": score, "passed": passed, "steps_taken": steps_taken, "total_reward": total_reward, "mean_reward": mean_reward, "token_efficiency_score": float(last_token_efficiency), "mer_score": mer_score, "time_to_solve": first_recovery_step, "failure_mode": explanation, "breakdown": breakdown, } except Exception as exc: return _error_episode(task_id=task_id, seed=seed, error=str(exc)) def aggregate_results(episodes: list[dict]) -> dict[str, Any]: """ Aggregate episode results into summary statistics. """ if not episodes: return { "mean_score": 0.0, "std_score": 0.0, "pass_rate": 0.0, "mean_steps": 0.0, "std_steps": 0.0, "mean_total_reward": 0.0, "std_total_reward": 0.0, "mean_token_efficiency": 0.0, "mean_mer_score": 0.0, "mean_time_to_solve": 0.0, "solve_rate": 0.0, "score_ci_95": [0.0, 0.0], "n_episodes": 0, } scores = [float(ep.get("score", 0.0)) for ep in episodes] steps = [float(ep.get("steps_taken", 0.0)) for ep in episodes] total_rewards = [float(ep.get("total_reward", 0.0)) for ep in episodes] token_eff = [float(ep.get("token_efficiency_score", 0.0)) for ep in episodes] mer_scores = [float(ep.get("mer_score", 0.0)) for ep in episodes] passes = [bool(ep.get("passed", False)) for ep in episodes] solved_times = [ float(ep["time_to_solve"]) for ep in episodes if ep.get("time_to_solve") is not None ] mean_score = statistics.mean(scores) std_score = statistics.stdev(scores) if len(scores) > 1 else 0.0 pass_rate = sum(1 for passed in passes if passed) / len(passes) mean_steps = statistics.mean(steps) std_steps = statistics.stdev(steps) if len(steps) > 1 else 0.0 mean_total_reward = statistics.mean(total_rewards) std_total_reward = statistics.stdev(total_rewards) if len(total_rewards) > 1 else 0.0 mean_token_efficiency = statistics.mean(token_eff) mean_mer_score = statistics.mean(mer_scores) mean_time_to_solve = statistics.mean(solved_times) if solved_times else 0.0 solve_rate = len(solved_times) / len(episodes) rng = random.Random(42) boot_means: list[float] = [] for _ in range(1000): resample = [rng.choice(scores) for _ in scores] boot_means.append(sum(resample) / len(resample)) boot_means.sort() ci_low = float(boot_means[24]) ci_high = float(boot_means[974]) return { "mean_score": mean_score, "std_score": std_score, "pass_rate": pass_rate, "mean_steps": mean_steps, "std_steps": std_steps, "mean_total_reward": mean_total_reward, "std_total_reward": std_total_reward, "mean_token_efficiency": mean_token_efficiency, "mean_mer_score": mean_mer_score, "mean_time_to_solve": mean_time_to_solve, "solve_rate": solve_rate, "score_ci_95": [ci_low, ci_high], "n_episodes": len(episodes), } def ascii_bar_chart( data: dict[str, float], title: str, width: int = 40, ) -> str: """ Render a horizontal ASCII bar chart. data = {label: value} where value in [0, 1] """ lines = [f"=== {title} ==="] for label, raw_value in data.items(): value = max(0.0, min(1.0, float(raw_value))) bar_len = int(round(value * width)) bar = "█" * bar_len lines.append(f"{label:<8} {bar:<{width}} {value:.3f}") return "\n".join(lines) def ascii_comparison_table( baseline: dict[str, dict], trained: dict[str, dict], metric: str = "mean_score", ) -> str: """ Render before/after comparison table. """ header = ( "Task Baseline Trained Delta CI (95%)\n" "─────────────────────────────────────────────────" ) rows = [header] for task in TASKS: base_value = float(baseline.get(task, {}).get(metric, 0.0)) trained_task = trained.get(task, {}) trained_value = float(trained_task.get(metric, 0.0)) delta = trained_value - base_value ci = trained_task.get("score_ci_95", [0.0, 0.0]) if not isinstance(ci, list) or len(ci) != 2: ci = [0.0, 0.0] rows.append( f"{task:<10} {base_value:>7.3f} {trained_value:>7.3f} {delta:+7.3f} [{ci[0]:.2f}, {ci[1]:.2f}]" ) return "\n".join(rows) def ascii_reward_curve(rewards: list[float], title: str) -> str: """ Render reward progression as ASCII line chart. Bucket rewards into 20 bins, show trend line. """ lines = [f"=== {title} ==="] if not rewards: lines.extend( [ "1.0 |", "0.8 |", "0.6 |", "0.4 |", "0.2 |", "0.0 |________________________", " 0 5 10 15 20", ] ) return "\n".join(lines) bucket_count = 20 bucketed: list[float] = [] n = len(rewards) for i in range(bucket_count): start = int(i * n / bucket_count) end = int((i + 1) * n / bucket_count) if end <= start: end = min(n, start + 1) segment = rewards[start:end] if segment: bucketed.append(sum(segment) / len(segment)) else: bucketed.append(0.0) levels = [1.0, 0.8, 0.6, 0.4, 0.2] for level in levels: row_chars: list[str] = [] for value in bucketed: row_chars.append("·" if value >= level else " ") lines.append(f"{level:.1f} |" + "".join(row_chars)) lines.append("0.0 |________________________") lines.append(" 0 5 10 15 20") return "\n".join(lines) def run_evaluation( agent_type: str, n_seeds: int = 50, output_file: str | None = None, ) -> dict[str, Any]: """ Run full evaluation across all tasks and n_seeds. """ if n_seeds <= 0: raise ValueError("n_seeds must be > 0") if agent_type not in {"oracle", "random", "trained"}: raise ValueError("agent_type must be one of: oracle, random, trained") seeds = SEEDS[:n_seeds] if n_seeds <= len(SEEDS) else list(range(n_seeds)) task_episodes: dict[str, list[dict[str, Any]]] = {task: [] for task in TASKS} task_summaries: dict[str, dict[str, Any]] = {} run_start = time.time() for task in TASKS: print(f"\nRunning task={task} with agent={agent_type} over {len(seeds)} seeds") episodes: list[dict[str, Any]] = [] for idx, seed in enumerate(seeds, start=1): if agent_type == "random": episode = run_random_episode(task, seed) else: episode = run_oracle_episode(task, seed) episodes.append(episode) score = float(episode.get("score", 0.01)) print(f" [{task}] seed {idx}/{len(seeds)}: score={score:.3f}") task_episodes[task] = episodes summary = aggregate_results(episodes) task_summaries[task] = summary print() print( ascii_bar_chart( {task: float(summary.get("mean_score", 0.0))}, title=f"{task.upper()} mean_score", ) ) print( ascii_bar_chart( {task: float(summary.get("pass_rate", 0.0))}, title=f"{task.upper()} pass_rate", ) ) print( ascii_reward_curve( [float(ep.get("mean_reward", 0.0)) for ep in episodes], title=f"{task.upper()} mean_reward trend", ) ) overall = { "mean_score": statistics.mean( [float(task_summaries[task].get("mean_score", 0.0)) for task in TASKS] ), "mean_pass_rate": statistics.mean( [float(task_summaries[task].get("pass_rate", 0.0)) for task in TASKS] ), "mean_token_efficiency": statistics.mean( [float(task_summaries[task].get("mean_token_efficiency", 0.0)) for task in TASKS] ), "elapsed_sec": round(time.time() - run_start, 2), } result = { "agent_type": agent_type, "timestamp": datetime.now().isoformat(), "n_seeds": len(seeds), "tasks": task_summaries, "episodes": task_episodes, "overall": overall, } print("\n=== OVERALL SUMMARY ===") print(json.dumps(overall, indent=2)) print() print( ascii_bar_chart( {task: float(task_summaries[task].get("mean_score", 0.0)) for task in TASKS}, "Mean Score by Task", ) ) print( ascii_bar_chart( {task: float(task_summaries[task].get("pass_rate", 0.0)) for task in TASKS}, "Pass Rate by Task", ) ) if output_file: output_dir = os.path.dirname(output_file) if output_dir: os.makedirs(output_dir, exist_ok=True) with open(output_file, "w", encoding="utf-8") as file: json.dump(result, file, indent=2) print(f"Saved evaluation JSON: {output_file}") return result def generate_comparison_report( baseline_file: str, trained_file: str, ) -> None: """ Load two eval JSON files and print comparison. """ with open(baseline_file, "r", encoding="utf-8") as file: baseline_data = json.load(file) with open(trained_file, "r", encoding="utf-8") as file: trained_data = json.load(file) baseline_tasks = baseline_data.get("tasks", {}) trained_tasks = trained_data.get("tasks", {}) sections: list[str] = [] sections.append("=== COMPARISON: mean_score ===") sections.append(ascii_comparison_table(baseline_tasks, trained_tasks, metric="mean_score")) sections.append("") sections.append("=== COMPARISON: pass_rate ===") sections.append(ascii_comparison_table(baseline_tasks, trained_tasks, metric="pass_rate")) sections.append("") sections.append("=== COMPARISON: mean_token_efficiency ===") sections.append( ascii_comparison_table( baseline_tasks, trained_tasks, metric="mean_token_efficiency", ) ) sections.append("") improved_tasks = 0 sections.append("=== TASK DELTAS ===") for task in TASKS: base_score = float(baseline_tasks.get(task, {}).get("mean_score", 0.0)) trained_score = float(trained_tasks.get(task, {}).get("mean_score", 0.0)) delta = trained_score - base_score rel = ((delta / base_score) * 100.0) if base_score > 0 else math.inf if delta > 0.05: improved_tasks += 1 if math.isinf(rel): rel_text = "inf" else: rel_text = f"{rel:+.2f}%" sections.append( f"{task:<8} delta={delta:+.3f} relative={rel_text} " f"(baseline={base_score:.3f} -> trained={trained_score:.3f})" ) verdict = ( f"TRAINING IMPROVED: {improved_tasks}/4 tasks showed meaningful gain (>0.05)" ) sections.append("") sections.append(verdict) report_text = "\n".join(sections) print(report_text) os.makedirs(RESULTS_DIR, exist_ok=True) report_path = os.path.join(RESULTS_DIR, "comparison_report.txt") with open(report_path, "w", encoding="utf-8") as file: file.write(report_text) print(f"Saved comparison report: {report_path}") if __name__ == "__main__": import sys os.makedirs(RESULTS_DIR, exist_ok=True) mode = sys.argv[1] if len(sys.argv) > 1 else "baseline" if mode == "baseline": print("Running ORACLE baseline evaluation (50 seeds)...") run_evaluation( agent_type="oracle", n_seeds=50, output_file=f"{RESULTS_DIR}/baseline_eval.json", ) print("\nRunning RANDOM baseline (50 seeds)...") run_evaluation( agent_type="random", n_seeds=50, output_file=f"{RESULTS_DIR}/random_eval.json", ) elif mode == "trained": print("Running post-training evaluation (50 seeds)...") run_evaluation( agent_type="oracle", n_seeds=50, output_file=f"{RESULTS_DIR}/trained_eval.json", ) elif mode == "compare": generate_comparison_report( baseline_file=f"{RESULTS_DIR}/baseline_eval.json", trained_file=f"{RESULTS_DIR}/trained_eval.json", ) elif mode == "quick": run_evaluation( agent_type="oracle", n_seeds=10, output_file=f"{RESULTS_DIR}/quick_eval.json", ) else: print(f"Unknown mode: {mode}") print("Usage: python scripts/evaluate.py [baseline|trained|compare|quick]") sys.exit(1)