nervousystem-env / scripts /evaluate_multi_agent.py
vx7sh's picture
docs(evidence): add multi-agent eval, long-horizon trace, and constrained eval logs
9731ebe
from __future__ import annotations
import json
import os
import statistics
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app.env import NervousSystemEnv
from app.models import CoalitionRequest, DelegateRequest, SREAction
RESULTS_DIR = "results"
def failing_rank(obs: dict[str, Any]) -> int:
for node in obs["nodes"]:
if node["health_status"] == "failed":
return int(node["node_id"])
return 0
def run_coordination_oracle(seed: int) -> dict[str, Any]:
env = NervousSystemEnv(seed=seed)
obs = env.reset(task_id="fleet_coordination", seed=seed).model_dump()
rank_id = failing_rank(obs)
actions: list[dict[str, Any]] = []
actions.append(
env.delegate(
DelegateRequest(
worker="log_inspector",
action="inspect_flight_recorder",
parameters={"rank_id": rank_id},
supervisor_reasoning="Confirm root-cause rank before remediation.",
)
)
)
actions.append(
env.delegate(
DelegateRequest(
worker="version_checker",
action="check_nccl_version",
parameters={},
supervisor_reasoning="Cascade symptoms require version compatibility evidence.",
)
)
)
consensus = (
env._fleet.get_worker_consensus(env._cluster)
if env._fleet is not None and env._cluster is not None
else {}
)
coalition = env.coalition_action(
CoalitionRequest(
proposing_worker="topo_agent",
supporting_worker="version_checker",
action="topology_version_fix",
parameters={},
rationale="Topology and NCCL version mismatch must be fixed jointly.",
)
)
grade_result = env.grade("fleet_coordination")
grade = {
"score": grade_result.score,
"passed": grade_result.passed,
"breakdown": grade_result.breakdown,
"explanation": grade_result.explanation,
"task_id": "fleet_coordination",
}
episode = env.get_episode_summary()
return {
"seed": seed,
"score": float(grade["score"]),
"passed": bool(grade["passed"]),
"grade": grade,
"delegations": actions,
"consensus": consensus,
"coalition": coalition,
"episode": episode,
}
def run_direct_baseline(seed: int) -> dict[str, Any]:
env = NervousSystemEnv(seed=seed)
env.reset(task_id="fleet_coordination", seed=seed)
step = env.step(
SREAction(action_type="topo_reorder", parameters={"affinity": "rack"})
)
grade_result = env.grade("fleet_coordination")
grade = {
"score": grade_result.score,
"passed": grade_result.passed,
"breakdown": grade_result.breakdown,
"explanation": grade_result.explanation,
"task_id": "fleet_coordination",
}
return {
"seed": seed,
"score": float(grade["score"]),
"passed": bool(grade["passed"]),
"step": step.model_dump(),
"grade": grade,
}
def summarize(episodes: list[dict[str, Any]]) -> dict[str, Any]:
scores = [float(ep["score"]) for ep in episodes]
return {
"mean_score": statistics.mean(scores) if scores else 0.0,
"pass_rate": sum(1 for ep in episodes if ep["passed"]) / max(1, len(episodes)),
"n_episodes": len(episodes),
}
def main() -> None:
os.makedirs(RESULTS_DIR, exist_ok=True)
seeds = list(range(int(os.getenv("MULTI_AGENT_SEEDS", "10"))))
oracle = [run_coordination_oracle(seed) for seed in seeds]
direct = [run_direct_baseline(seed) for seed in seeds]
result = {
"timestamp": datetime.now().isoformat(),
"task_id": "fleet_coordination",
"oracle_summary": summarize(oracle),
"direct_baseline_summary": summarize(direct),
"oracle_episodes": oracle,
"direct_baseline_episodes": direct,
}
path = os.path.join(RESULTS_DIR, "multi_agent_eval.json")
with open(path, "w", encoding="utf-8") as file:
json.dump(result, file, indent=2)
print(json.dumps(result["oracle_summary"], indent=2))
print(json.dumps(result["direct_baseline_summary"], indent=2))
print(f"Saved {path}")
if __name__ == "__main__":
main()