Spaces:
Sleeping
Sleeping
| import json | |
| from pathlib import Path | |
| # Model display names (short for figures) | |
| # Follows ArtificialAnalysis.ai naming conventions | |
| MODEL_DISPLAY_NAMES = { | |
| # OpenAI / Azure | |
| "Azure_gpt-5.1-2025-11-13": "GPT-5.1", | |
| "Azure_gpt-5.1-chat-2025-11-13": "GPT-5.1", | |
| "Azure_o4-mini": "o4-mini", | |
| "Azure_gpt-4o": "GPT-4o", | |
| "openai_gpt-oss-120b": "GPT-OSS-120B", | |
| "openai_gpt-oss-20b": "GPT-OSS-20B", | |
| # Google / GCP | |
| "GCP_gemini-2.5-pro": "Gemini 2.5 Pro", | |
| "gemini-2.5-pro": "Gemini 2.5 Pro", | |
| "gcp_gemini-3-pro-preview": "Gemini 3 Pro", | |
| "gemini-3-pro-preview": "Gemini 3 Pro", | |
| "gemini-3-flash-preview": "Gemini 3 Flash", | |
| "google_gemini-3-flash-preview": "Gemini 3 Flash", | |
| # Moonshot AI | |
| "moonshotai_kimi-k2-thinking": "Kimi K2", | |
| "kimi-k2-thinking": "Kimi K2", | |
| # Anthropic / AWS | |
| "aws_claude-opus-4-5": "Claude Opus 4.5", | |
| # Mistral AI | |
| "mistralai_mistral-large-2512": "Mistral Large", | |
| # Alibaba / Qwen | |
| "qwen_qwen3-vl-32b-instruct": "Qwen3-VL-32B", | |
| # ServiceNow | |
| "ServiceNow-AI_Apriel-1.6-15b-Thinker": "Apriel-1.6-15B", | |
| } | |
| def get_model_name(dirname: str) -> str: | |
| """Extract model name from directory name.""" | |
| name = dirname.replace("react with code_", "").replace("_07ccdb1", "") | |
| return MODEL_DISPLAY_NAMES.get(name, name) | |
| def find_react_with_code_dirs(leaderboard_dir: Path) -> list[Path]: | |
| """Find all agent directories (excluding hidden and backup directories).""" | |
| dirs = [] | |
| for d in leaderboard_dir.iterdir(): | |
| if d.is_dir() and not d.name.startswith(".") and not d.name.startswith("backup_"): | |
| dirs.append(d) | |
| return sorted(dirs) | |
| def read_judge_outputs_from_dir(agent_dir: Path) -> dict[str, list[dict]]: | |
| """ | |
| Read all judge_output.json files from an agent directory. | |
| Returns: | |
| Dict mapping scenario_id -> list of judge outputs (one per trial) | |
| """ | |
| scenario_data = {} | |
| # Check if directory contains Scenario folders directly, or if we need to go one level deeper | |
| # (e.g., agent_dir/sre/Scenario-1, agent_dir/finops/Scenario-1, etc.) | |
| has_scenarios = any(d.name.startswith("Scenario") for d in agent_dir.iterdir() if d.is_dir()) | |
| if not has_scenarios: | |
| # Look for subdirectories that might contain scenarios (sre, finops, etc.) | |
| subdirs = [d for d in agent_dir.iterdir() if d.is_dir() and not d.name.startswith(".")] | |
| if len(subdirs) == 1: | |
| # If there's exactly one subdirectory, use it | |
| agent_dir = subdirs[0] | |
| elif len(subdirs) > 1: | |
| # If there are multiple, try to find one with Scenario folders | |
| for subdir in subdirs: | |
| if any(d.name.startswith("Scenario") for d in subdir.iterdir() if d.is_dir()): | |
| agent_dir = subdir | |
| break | |
| for scenario_dir in agent_dir.iterdir(): | |
| if not scenario_dir.is_dir() or not scenario_dir.name.startswith("Scenario"): | |
| continue | |
| scenario_id = scenario_dir.name | |
| trials = [] | |
| # Look for trial subdirectories (1, 2, 3, etc.) | |
| for trial_dir in sorted(scenario_dir.iterdir()): | |
| if not trial_dir.is_dir(): | |
| continue | |
| judge_file = trial_dir / "judge_output.json" | |
| if judge_file.exists(): | |
| try: | |
| with open(judge_file) as f: | |
| judge_data = json.load(f) | |
| trials.append(judge_data) | |
| except Exception as e: | |
| print(f" Warning: Error reading {judge_file}: {e}") | |
| if trials: | |
| scenario_data[scenario_id] = trials | |
| return scenario_data | |
| def extract_trial_scores_from_judge_outputs( | |
| scenario_data: dict[str, list[dict]], | |
| metric: str | |
| ) -> dict[str, list[float]]: | |
| """ | |
| Extract per-trial scores for a given metric from judge outputs. | |
| Args: | |
| scenario_data: Dict mapping scenario_id -> list of judge outputs | |
| metric: The metric name to extract | |
| Returns: | |
| Dict mapping scenario_id -> list of trial scores | |
| """ | |
| scenario_trials = {} | |
| for scenario_id, trials in scenario_data.items(): | |
| scores = [] | |
| for trial in trials: | |
| flat_scores = trial.get("flat_scores", {}) | |
| score = flat_scores.get(metric) | |
| # Handle None/null values | |
| if score is None: | |
| score = 0.0 | |
| scores.append(float(score)) | |
| if scores: | |
| scenario_trials[scenario_id] = scores | |
| return scenario_trials | |
| def get_runs_stats(scenario_data: dict[str, list], min_runs_required: int) -> tuple[int, int, int, int]: | |
| """Get run statistics: (n_scenarios, min_runs, max_runs, n_qualifying).""" | |
| if not scenario_data: | |
| return 0, 0, 0, 0 | |
| run_counts = [len(trials) for trials in scenario_data.values()] | |
| n_qualifying = sum(1 for c in run_counts if c >= min_runs_required) | |
| return len(scenario_data), min(run_counts), max(run_counts), n_qualifying | |
| def filter_scenarios_with_min_runs(scenario_data: dict[str, list], min_runs_required: int) -> dict[str, list]: | |
| """Filter to only include scenarios with >= min_runs_required runs.""" | |
| return { | |
| scenario_id: trials | |
| for scenario_id, trials in scenario_data.items() | |
| if len(trials) >= min_runs_required | |
| } | |
| def find_latest_rollout_file(trial_dir: Path) -> Path: | |
| """Find the latest rollout file in a trial's sessions directory or session.jsonl.""" | |
| # First check for session.jsonl directly in trial_dir (new structure) | |
| session_file = trial_dir / "session.jsonl" | |
| if session_file.exists(): | |
| return session_file | |
| # Fall back to sessions/rollout-*.jsonl (old structure) | |
| sessions_dir = trial_dir / "sessions" | |
| if not sessions_dir.exists(): | |
| return None | |
| rollout_files = [] | |
| for rollout_file in sessions_dir.rglob("rollout-*.jsonl"): | |
| rollout_files.append(rollout_file) | |
| if not rollout_files: | |
| return None | |
| # Sort by modification time and return the latest | |
| return max(rollout_files, key=lambda p: p.stat().st_mtime) | |