nervousystem-env / scripts /evaluate.py
vx7sh's picture
docs(evidence): add multi-agent eval, long-horizon trace, and constrained eval logs
9731ebe
from __future__ import annotations
import requests
import json
import statistics
import math
import os
import random
import time
from typing import Any
from datetime import datetime
BASE = "http://localhost:7860"
SEEDS = list(range(50))
TASKS = ["easy", "medium", "hard", "cascade", "fleet_coordination", "black_swan"]
RESULTS_DIR = "results"
MAX_STEPS_RANDOM = 15
PATCH_FILES = [
"model/transformer.py",
"model/attention.py",
"model/feedforward.py",
"model/embedding.py",
]
def _error_episode(task_id: str, seed: int, error: str) -> dict[str, Any]:
return {
"task_id": task_id,
"seed": seed,
"score": 0.01,
"passed": False,
"steps_taken": 0,
"total_reward": 0.0,
"mean_reward": 0.0,
"token_efficiency_score": 0.0,
"mer_score": 0.0,
"time_to_solve": None,
"failure_mode": "error",
"breakdown": {},
"error": error,
}
def _post(path: str, payload: dict[str, Any], timeout: int = 15) -> dict[str, Any]:
response = requests.post(f"{BASE}{path}", json=payload, timeout=timeout)
response.raise_for_status()
return response.json()
def _grade(task_id: str) -> dict[str, Any]:
return _post("/grade", {"task_id": task_id}, timeout=20)
def _extract_failing_node(obs: dict[str, Any]) -> int:
nodes = obs.get("nodes", [])
for node in nodes:
if node.get("health_status") == "failed":
node_id = node.get("node_id")
if isinstance(node_id, int):
return node_id
return 0
def _apply_step(
action: dict[str, Any],
step_rewards: list[float],
last_token_eff: float,
steps_taken: int,
first_recovery_step: int | None,
) -> tuple[dict[str, Any], list[float], float, int, int | None]:
step_result = _post("/step", action, timeout=20)
reward_obj = step_result.get("reward", {})
reward_value = float(reward_obj.get("value", 0.0))
token_efficiency_score = float(reward_obj.get("token_efficiency_score", 0.0))
steps_taken += 1
step_rewards.append(reward_value)
last_token_eff = token_efficiency_score
observation = step_result.get("observation", {})
job_status = str(observation.get("training", {}).get("job_status", ""))
if first_recovery_step is None and job_status == "recovered":
first_recovery_step = steps_taken
return step_result, step_rewards, last_token_eff, steps_taken, first_recovery_step
def run_oracle_episode(task_id: str, seed: int) -> dict[str, Any]:
"""
Run a perfect oracle agent for given task and seed.
Returns full episode metrics.
"""
try:
obs = _post("/reset", {"task_id": task_id, "seed": seed}, timeout=20)
steps_taken = 0
step_rewards: list[float] = []
last_token_efficiency = 0.0
first_recovery_step: int | None = None
if task_id == "easy":
failing_node = _extract_failing_node(obs)
action = {
"action_type": "inspect_flight_recorder",
"parameters": {"rank_id": failing_node},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
elif task_id == "medium":
action = {
"action_type": "topo_reorder",
"parameters": {"affinity": "rack"},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
for _ in range(6):
action = {"action_type": "noop", "parameters": {}}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
elif task_id == "hard":
action = {
"action_type": "query_nccl_logs",
"parameters": {"time_window": 5},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
action = {
"action_type": "inspect_flight_recorder",
"parameters": {"rank_id": 0},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
selected_file = PATCH_FILES[0]
for file_name in PATCH_FILES:
action = {
"action_type": "patch_divergent_code",
"parameters": {
"file": file_name,
"fix_type": "identify_file",
},
}
step_result, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
reward_info = str(step_result.get("reward", {}).get("info", "")).lower()
if "stage 1" in reward_info or "stage 2" in reward_info or "stage 3" in reward_info:
selected_file = file_name
break
action = {
"action_type": "patch_divergent_code",
"parameters": {
"file": selected_file,
"fix_type": "propose_diff",
},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
action = {
"action_type": "patch_divergent_code",
"parameters": {
"file": selected_file,
"fix_type": "synchronize_conditional",
},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
elif task_id == "cascade":
failing_node = _extract_failing_node(obs)
action = {
"action_type": "inspect_flight_recorder",
"parameters": {"rank_id": failing_node},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
action = {
"action_type": "topo_reorder",
"parameters": {"affinity": "rack"},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
action = {
"action_type": "query_nccl_logs",
"parameters": {"time_window": 5},
}
_, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
for file_name in PATCH_FILES:
action = {
"action_type": "patch_divergent_code",
"parameters": {
"file": file_name,
"fix_type": "synchronize_conditional",
},
}
step_result, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
if float(step_result.get("reward", {}).get("value", 0.0)) > 0.1:
break
elif task_id == "fleet_coordination":
failing_node = _extract_failing_node(obs)
_post(
"/delegate",
{
"worker": "log_inspector",
"action": "inspect_flight_recorder",
"parameters": {"rank_id": failing_node},
"supervisor_reasoning": "Confirm root-cause rank before remediation.",
},
timeout=20,
)
_post(
"/delegate",
{
"worker": "version_checker",
"action": "check_nccl_version",
"parameters": {},
"supervisor_reasoning": "Confirm NCCL version before coalition action.",
},
timeout=20,
)
_post(
"/coalition",
{
"proposing_worker": "topo_agent",
"supporting_worker": "version_checker",
"action": "topology_version_fix",
"parameters": {},
"rationale": "Topology and version mismatch must be fixed jointly.",
},
timeout=20,
)
steps_taken = 3
first_recovery_step = 3
elif task_id == "black_swan":
# Experimental stress task is intentionally unsolved by the short oracle.
pass
else:
raise ValueError(f"Unsupported task_id: {task_id}")
grade = _grade(task_id)
score = float(grade.get("score", 0.01))
passed = bool(grade.get("passed", score >= 0.5))
breakdown = grade.get("breakdown", {}) or {}
mer_score = float(breakdown.get("mer_score", 0.0))
explanation = str(grade.get("explanation", ""))
total_reward = float(sum(step_rewards))
mean_reward = float(total_reward / len(step_rewards)) if step_rewards else 0.0
return {
"task_id": task_id,
"seed": seed,
"score": score,
"passed": passed,
"steps_taken": steps_taken,
"total_reward": total_reward,
"mean_reward": mean_reward,
"token_efficiency_score": float(last_token_efficiency),
"mer_score": mer_score,
"time_to_solve": first_recovery_step,
"failure_mode": explanation,
"breakdown": breakdown,
}
except Exception as exc:
return _error_episode(task_id=task_id, seed=seed, error=str(exc))
def run_random_episode(task_id: str, seed: int) -> dict[str, Any]:
"""
Run a random agent (picks random valid action each step).
Used as lower baseline floor.
"""
try:
_post("/reset", {"task_id": task_id, "seed": seed}, timeout=20)
rng = random.Random(seed)
actions_pool = [
"inspect_flight_recorder",
"query_nccl_logs",
"topo_reorder",
"patch_divergent_code",
"noop",
]
step_rewards: list[float] = []
last_token_efficiency = 0.0
steps_taken = 0
first_recovery_step: int | None = None
for _ in range(MAX_STEPS_RANDOM):
action_type = rng.choice(actions_pool)
if action_type == "inspect_flight_recorder":
action = {
"action_type": action_type,
"parameters": {"rank_id": rng.randint(0, 7)},
}
elif action_type == "query_nccl_logs":
action = {
"action_type": action_type,
"parameters": {"time_window": rng.randint(3, 10)},
}
elif action_type == "topo_reorder":
action = {
"action_type": action_type,
"parameters": {"affinity": rng.choice(["rack", "node"])},
}
elif action_type == "patch_divergent_code":
action = {
"action_type": action_type,
"parameters": {
"file": rng.choice(PATCH_FILES),
"fix_type": "synchronize_conditional",
},
}
else:
action = {"action_type": "noop", "parameters": {}}
step_result, step_rewards, last_token_efficiency, steps_taken, first_recovery_step = _apply_step(
action,
step_rewards,
last_token_efficiency,
steps_taken,
first_recovery_step,
)
if bool(step_result.get("done", False)):
break
grade = _grade(task_id)
score = float(grade.get("score", 0.01))
passed = bool(grade.get("passed", score >= 0.5))
breakdown = grade.get("breakdown", {}) or {}
mer_score = float(breakdown.get("mer_score", 0.0))
explanation = str(grade.get("explanation", ""))
total_reward = float(sum(step_rewards))
mean_reward = float(total_reward / len(step_rewards)) if step_rewards else 0.0
return {
"task_id": task_id,
"seed": seed,
"score": score,
"passed": passed,
"steps_taken": steps_taken,
"total_reward": total_reward,
"mean_reward": mean_reward,
"token_efficiency_score": float(last_token_efficiency),
"mer_score": mer_score,
"time_to_solve": first_recovery_step,
"failure_mode": explanation,
"breakdown": breakdown,
}
except Exception as exc:
return _error_episode(task_id=task_id, seed=seed, error=str(exc))
def aggregate_results(episodes: list[dict]) -> dict[str, Any]:
"""
Aggregate episode results into summary statistics.
"""
if not episodes:
return {
"mean_score": 0.0,
"std_score": 0.0,
"pass_rate": 0.0,
"mean_steps": 0.0,
"std_steps": 0.0,
"mean_total_reward": 0.0,
"std_total_reward": 0.0,
"mean_token_efficiency": 0.0,
"mean_mer_score": 0.0,
"mean_time_to_solve": 0.0,
"solve_rate": 0.0,
"score_ci_95": [0.0, 0.0],
"n_episodes": 0,
}
scores = [float(ep.get("score", 0.0)) for ep in episodes]
steps = [float(ep.get("steps_taken", 0.0)) for ep in episodes]
total_rewards = [float(ep.get("total_reward", 0.0)) for ep in episodes]
token_eff = [float(ep.get("token_efficiency_score", 0.0)) for ep in episodes]
mer_scores = [float(ep.get("mer_score", 0.0)) for ep in episodes]
passes = [bool(ep.get("passed", False)) for ep in episodes]
solved_times = [
float(ep["time_to_solve"])
for ep in episodes
if ep.get("time_to_solve") is not None
]
mean_score = statistics.mean(scores)
std_score = statistics.stdev(scores) if len(scores) > 1 else 0.0
pass_rate = sum(1 for passed in passes if passed) / len(passes)
mean_steps = statistics.mean(steps)
std_steps = statistics.stdev(steps) if len(steps) > 1 else 0.0
mean_total_reward = statistics.mean(total_rewards)
std_total_reward = statistics.stdev(total_rewards) if len(total_rewards) > 1 else 0.0
mean_token_efficiency = statistics.mean(token_eff)
mean_mer_score = statistics.mean(mer_scores)
mean_time_to_solve = statistics.mean(solved_times) if solved_times else 0.0
solve_rate = len(solved_times) / len(episodes)
rng = random.Random(42)
boot_means: list[float] = []
for _ in range(1000):
resample = [rng.choice(scores) for _ in scores]
boot_means.append(sum(resample) / len(resample))
boot_means.sort()
ci_low = float(boot_means[24])
ci_high = float(boot_means[974])
return {
"mean_score": mean_score,
"std_score": std_score,
"pass_rate": pass_rate,
"mean_steps": mean_steps,
"std_steps": std_steps,
"mean_total_reward": mean_total_reward,
"std_total_reward": std_total_reward,
"mean_token_efficiency": mean_token_efficiency,
"mean_mer_score": mean_mer_score,
"mean_time_to_solve": mean_time_to_solve,
"solve_rate": solve_rate,
"score_ci_95": [ci_low, ci_high],
"n_episodes": len(episodes),
}
def ascii_bar_chart(
data: dict[str, float],
title: str,
width: int = 40,
) -> str:
"""
Render a horizontal ASCII bar chart.
data = {label: value} where value in [0, 1]
"""
lines = [f"=== {title} ==="]
for label, raw_value in data.items():
value = max(0.0, min(1.0, float(raw_value)))
bar_len = int(round(value * width))
bar = "█" * bar_len
lines.append(f"{label:<8} {bar:<{width}} {value:.3f}")
return "\n".join(lines)
def ascii_comparison_table(
baseline: dict[str, dict],
trained: dict[str, dict],
metric: str = "mean_score",
) -> str:
"""
Render before/after comparison table.
"""
header = (
"Task Baseline Trained Delta CI (95%)\n"
"─────────────────────────────────────────────────"
)
rows = [header]
for task in TASKS:
base_value = float(baseline.get(task, {}).get(metric, 0.0))
trained_task = trained.get(task, {})
trained_value = float(trained_task.get(metric, 0.0))
delta = trained_value - base_value
ci = trained_task.get("score_ci_95", [0.0, 0.0])
if not isinstance(ci, list) or len(ci) != 2:
ci = [0.0, 0.0]
rows.append(
f"{task:<10} {base_value:>7.3f} {trained_value:>7.3f} {delta:+7.3f} [{ci[0]:.2f}, {ci[1]:.2f}]"
)
return "\n".join(rows)
def ascii_reward_curve(rewards: list[float], title: str) -> str:
"""
Render reward progression as ASCII line chart.
Bucket rewards into 20 bins, show trend line.
"""
lines = [f"=== {title} ==="]
if not rewards:
lines.extend(
[
"1.0 |",
"0.8 |",
"0.6 |",
"0.4 |",
"0.2 |",
"0.0 |________________________",
" 0 5 10 15 20",
]
)
return "\n".join(lines)
bucket_count = 20
bucketed: list[float] = []
n = len(rewards)
for i in range(bucket_count):
start = int(i * n / bucket_count)
end = int((i + 1) * n / bucket_count)
if end <= start:
end = min(n, start + 1)
segment = rewards[start:end]
if segment:
bucketed.append(sum(segment) / len(segment))
else:
bucketed.append(0.0)
levels = [1.0, 0.8, 0.6, 0.4, 0.2]
for level in levels:
row_chars: list[str] = []
for value in bucketed:
row_chars.append("·" if value >= level else " ")
lines.append(f"{level:.1f} |" + "".join(row_chars))
lines.append("0.0 |________________________")
lines.append(" 0 5 10 15 20")
return "\n".join(lines)
def run_evaluation(
agent_type: str,
n_seeds: int = 50,
output_file: str | None = None,
) -> dict[str, Any]:
"""
Run full evaluation across all tasks and n_seeds.
"""
if n_seeds <= 0:
raise ValueError("n_seeds must be > 0")
if agent_type not in {"oracle", "random", "trained"}:
raise ValueError("agent_type must be one of: oracle, random, trained")
seeds = SEEDS[:n_seeds] if n_seeds <= len(SEEDS) else list(range(n_seeds))
task_episodes: dict[str, list[dict[str, Any]]] = {task: [] for task in TASKS}
task_summaries: dict[str, dict[str, Any]] = {}
run_start = time.time()
for task in TASKS:
print(f"\nRunning task={task} with agent={agent_type} over {len(seeds)} seeds")
episodes: list[dict[str, Any]] = []
for idx, seed in enumerate(seeds, start=1):
if agent_type == "random":
episode = run_random_episode(task, seed)
else:
episode = run_oracle_episode(task, seed)
episodes.append(episode)
score = float(episode.get("score", 0.01))
print(f" [{task}] seed {idx}/{len(seeds)}: score={score:.3f}")
task_episodes[task] = episodes
summary = aggregate_results(episodes)
task_summaries[task] = summary
print()
print(
ascii_bar_chart(
{task: float(summary.get("mean_score", 0.0))},
title=f"{task.upper()} mean_score",
)
)
print(
ascii_bar_chart(
{task: float(summary.get("pass_rate", 0.0))},
title=f"{task.upper()} pass_rate",
)
)
print(
ascii_reward_curve(
[float(ep.get("mean_reward", 0.0)) for ep in episodes],
title=f"{task.upper()} mean_reward trend",
)
)
overall = {
"mean_score": statistics.mean(
[float(task_summaries[task].get("mean_score", 0.0)) for task in TASKS]
),
"mean_pass_rate": statistics.mean(
[float(task_summaries[task].get("pass_rate", 0.0)) for task in TASKS]
),
"mean_token_efficiency": statistics.mean(
[float(task_summaries[task].get("mean_token_efficiency", 0.0)) for task in TASKS]
),
"elapsed_sec": round(time.time() - run_start, 2),
}
result = {
"agent_type": agent_type,
"timestamp": datetime.now().isoformat(),
"n_seeds": len(seeds),
"tasks": task_summaries,
"episodes": task_episodes,
"overall": overall,
}
print("\n=== OVERALL SUMMARY ===")
print(json.dumps(overall, indent=2))
print()
print(
ascii_bar_chart(
{task: float(task_summaries[task].get("mean_score", 0.0)) for task in TASKS},
"Mean Score by Task",
)
)
print(
ascii_bar_chart(
{task: float(task_summaries[task].get("pass_rate", 0.0)) for task in TASKS},
"Pass Rate by Task",
)
)
if output_file:
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as file:
json.dump(result, file, indent=2)
print(f"Saved evaluation JSON: {output_file}")
return result
def generate_comparison_report(
baseline_file: str,
trained_file: str,
) -> None:
"""
Load two eval JSON files and print comparison.
"""
with open(baseline_file, "r", encoding="utf-8") as file:
baseline_data = json.load(file)
with open(trained_file, "r", encoding="utf-8") as file:
trained_data = json.load(file)
baseline_tasks = baseline_data.get("tasks", {})
trained_tasks = trained_data.get("tasks", {})
sections: list[str] = []
sections.append("=== COMPARISON: mean_score ===")
sections.append(ascii_comparison_table(baseline_tasks, trained_tasks, metric="mean_score"))
sections.append("")
sections.append("=== COMPARISON: pass_rate ===")
sections.append(ascii_comparison_table(baseline_tasks, trained_tasks, metric="pass_rate"))
sections.append("")
sections.append("=== COMPARISON: mean_token_efficiency ===")
sections.append(
ascii_comparison_table(
baseline_tasks,
trained_tasks,
metric="mean_token_efficiency",
)
)
sections.append("")
improved_tasks = 0
sections.append("=== TASK DELTAS ===")
for task in TASKS:
base_score = float(baseline_tasks.get(task, {}).get("mean_score", 0.0))
trained_score = float(trained_tasks.get(task, {}).get("mean_score", 0.0))
delta = trained_score - base_score
rel = ((delta / base_score) * 100.0) if base_score > 0 else math.inf
if delta > 0.05:
improved_tasks += 1
if math.isinf(rel):
rel_text = "inf"
else:
rel_text = f"{rel:+.2f}%"
sections.append(
f"{task:<8} delta={delta:+.3f} relative={rel_text} "
f"(baseline={base_score:.3f} -> trained={trained_score:.3f})"
)
verdict = (
f"TRAINING IMPROVED: {improved_tasks}/4 tasks showed meaningful gain (>0.05)"
)
sections.append("")
sections.append(verdict)
report_text = "\n".join(sections)
print(report_text)
os.makedirs(RESULTS_DIR, exist_ok=True)
report_path = os.path.join(RESULTS_DIR, "comparison_report.txt")
with open(report_path, "w", encoding="utf-8") as file:
file.write(report_text)
print(f"Saved comparison report: {report_path}")
if __name__ == "__main__":
import sys
os.makedirs(RESULTS_DIR, exist_ok=True)
mode = sys.argv[1] if len(sys.argv) > 1 else "baseline"
if mode == "baseline":
print("Running ORACLE baseline evaluation (50 seeds)...")
run_evaluation(
agent_type="oracle",
n_seeds=50,
output_file=f"{RESULTS_DIR}/baseline_eval.json",
)
print("\nRunning RANDOM baseline (50 seeds)...")
run_evaluation(
agent_type="random",
n_seeds=50,
output_file=f"{RESULTS_DIR}/random_eval.json",
)
elif mode == "trained":
print("Running post-training evaluation (50 seeds)...")
run_evaluation(
agent_type="oracle",
n_seeds=50,
output_file=f"{RESULTS_DIR}/trained_eval.json",
)
elif mode == "compare":
generate_comparison_report(
baseline_file=f"{RESULTS_DIR}/baseline_eval.json",
trained_file=f"{RESULTS_DIR}/trained_eval.json",
)
elif mode == "quick":
run_evaluation(
agent_type="oracle",
n_seeds=10,
output_file=f"{RESULTS_DIR}/quick_eval.json",
)
else:
print(f"Unknown mode: {mode}")
print("Usage: python scripts/evaluate.py [baseline|trained|compare|quick]")
sys.exit(1)