import json from pathlib import Path # Model display names (short for figures) # Follows ArtificialAnalysis.ai naming conventions MODEL_DISPLAY_NAMES = { # OpenAI / Azure "Azure_gpt-5.1-2025-11-13": "GPT-5.1", "Azure_gpt-5.1-chat-2025-11-13": "GPT-5.1", "Azure_o4-mini": "o4-mini", "Azure_gpt-4o": "GPT-4o", "openai_gpt-oss-120b": "GPT-OSS-120B", "openai_gpt-oss-20b": "GPT-OSS-20B", # Google / GCP "GCP_gemini-2.5-pro": "Gemini 2.5 Pro", "gemini-2.5-pro": "Gemini 2.5 Pro", "gcp_gemini-3-pro-preview": "Gemini 3 Pro", "gemini-3-pro-preview": "Gemini 3 Pro", "gemini-3-flash-preview": "Gemini 3 Flash", "google_gemini-3-flash-preview": "Gemini 3 Flash", # Moonshot AI "moonshotai_kimi-k2-thinking": "Kimi K2", "kimi-k2-thinking": "Kimi K2", # Anthropic / AWS "aws_claude-opus-4-5": "Claude Opus 4.5", # Mistral AI "mistralai_mistral-large-2512": "Mistral Large", # Alibaba / Qwen "qwen_qwen3-vl-32b-instruct": "Qwen3-VL-32B", # ServiceNow "ServiceNow-AI_Apriel-1.6-15b-Thinker": "Apriel-1.6-15B", } def get_model_name(dirname: str) -> str: """Extract model name from directory name.""" name = dirname.replace("react with code_", "").replace("_07ccdb1", "") return MODEL_DISPLAY_NAMES.get(name, name) def find_react_with_code_dirs(leaderboard_dir: Path) -> list[Path]: """Find all agent directories (excluding hidden and backup directories).""" dirs = [] for d in leaderboard_dir.iterdir(): if d.is_dir() and not d.name.startswith(".") and not d.name.startswith("backup_"): dirs.append(d) return sorted(dirs) def read_judge_outputs_from_dir(agent_dir: Path) -> dict[str, list[dict]]: """ Read all judge_output.json files from an agent directory. Returns: Dict mapping scenario_id -> list of judge outputs (one per trial) """ scenario_data = {} # Check if directory contains Scenario folders directly, or if we need to go one level deeper # (e.g., agent_dir/sre/Scenario-1, agent_dir/finops/Scenario-1, etc.) has_scenarios = any(d.name.startswith("Scenario") for d in agent_dir.iterdir() if d.is_dir()) if not has_scenarios: # Look for subdirectories that might contain scenarios (sre, finops, etc.) subdirs = [d for d in agent_dir.iterdir() if d.is_dir() and not d.name.startswith(".")] if len(subdirs) == 1: # If there's exactly one subdirectory, use it agent_dir = subdirs[0] elif len(subdirs) > 1: # If there are multiple, try to find one with Scenario folders for subdir in subdirs: if any(d.name.startswith("Scenario") for d in subdir.iterdir() if d.is_dir()): agent_dir = subdir break for scenario_dir in agent_dir.iterdir(): if not scenario_dir.is_dir() or not scenario_dir.name.startswith("Scenario"): continue scenario_id = scenario_dir.name trials = [] # Look for trial subdirectories (1, 2, 3, etc.) for trial_dir in sorted(scenario_dir.iterdir()): if not trial_dir.is_dir(): continue judge_file = trial_dir / "judge_output.json" if judge_file.exists(): try: with open(judge_file) as f: judge_data = json.load(f) trials.append(judge_data) except Exception as e: print(f" Warning: Error reading {judge_file}: {e}") if trials: scenario_data[scenario_id] = trials return scenario_data def extract_trial_scores_from_judge_outputs( scenario_data: dict[str, list[dict]], metric: str ) -> dict[str, list[float]]: """ Extract per-trial scores for a given metric from judge outputs. Args: scenario_data: Dict mapping scenario_id -> list of judge outputs metric: The metric name to extract Returns: Dict mapping scenario_id -> list of trial scores """ scenario_trials = {} for scenario_id, trials in scenario_data.items(): scores = [] for trial in trials: flat_scores = trial.get("flat_scores", {}) score = flat_scores.get(metric) # Handle None/null values if score is None: score = 0.0 scores.append(float(score)) if scores: scenario_trials[scenario_id] = scores return scenario_trials def get_runs_stats(scenario_data: dict[str, list], min_runs_required: int) -> tuple[int, int, int, int]: """Get run statistics: (n_scenarios, min_runs, max_runs, n_qualifying).""" if not scenario_data: return 0, 0, 0, 0 run_counts = [len(trials) for trials in scenario_data.values()] n_qualifying = sum(1 for c in run_counts if c >= min_runs_required) return len(scenario_data), min(run_counts), max(run_counts), n_qualifying def filter_scenarios_with_min_runs(scenario_data: dict[str, list], min_runs_required: int) -> dict[str, list]: """Filter to only include scenarios with >= min_runs_required runs.""" return { scenario_id: trials for scenario_id, trials in scenario_data.items() if len(trials) >= min_runs_required } def find_latest_rollout_file(trial_dir: Path) -> Path: """Find the latest rollout file in a trial's sessions directory or session.jsonl.""" # First check for session.jsonl directly in trial_dir (new structure) session_file = trial_dir / "session.jsonl" if session_file.exists(): return session_file # Fall back to sessions/rollout-*.jsonl (old structure) sessions_dir = trial_dir / "sessions" if not sessions_dir.exists(): return None rollout_files = [] for rollout_file in sessions_dir.rglob("rollout-*.jsonl"): rollout_files.append(rollout_file) if not rollout_files: return None # Sort by modification time and return the latest return max(rollout_files, key=lambda p: p.stat().st_mtime)