import json from pathlib import Path import pandas as pd # Model display names (short for figures) # Follows ArtificialAnalysis.ai naming conventions MODEL_DISPLAY_NAMES = { # OpenAI / Azure "Azure_gpt-5.1-2025-11-13": "GPT-5.1", "Azure_gpt-5.1-chat-2025-11-13": "GPT-5.1", "Azure_o4-mini": "o4-mini", "Azure_gpt-4o": "GPT-4o", "openai_gpt-oss-120b": "GPT-OSS-120B", "openai_gpt-oss-20b": "GPT-OSS-20B", # Google / GCP "GCP_gemini-2.5-pro": "Gemini 2.5 Pro", "gemini-2.5-pro": "Gemini 2.5 Pro", "gcp_gemini-3-pro-preview": "Gemini 3 Pro", "gemini-3-pro-preview": "Gemini 3 Pro", "gemini-3-flash-preview": "Gemini 3 Flash", "google_gemini-3-flash-preview": "Gemini 3 Flash", # Moonshot AI "moonshotai_kimi-k2-thinking": "Kimi K2", "kimi-k2-thinking": "Kimi K2", # Anthropic / AWS "aws_claude-opus-4-5": "Claude Opus 4.5", # Mistral AI "mistralai_mistral-large-2512": "Mistral Large", # Alibaba / Qwen "qwen_qwen3-vl-32b-instruct": "Qwen3-VL-32B", # ServiceNow "ServiceNow-AI_Apriel-1.6-15b-Thinker": "Apriel-1.6-15B", } def get_model_name(dirname: str) -> str: """Extract model name from directory name.""" name = dirname.replace("react with code_", "").replace("_07ccdb1", "") return MODEL_DISPLAY_NAMES.get(name, name) def find_react_with_code_dirs(leaderboard_dir: Path) -> list[Path]: """Find all agent directories (excluding hidden and backup directories).""" dirs = [] for d in leaderboard_dir.iterdir(): if d.is_dir() and not d.name.startswith(".") and not d.name.startswith("backup_"): dirs.append(d) return sorted(dirs) def read_judge_outputs_from_dir(agent_dir: Path) -> dict[str, list[dict]]: """ Read all judge_output.json files from an agent directory. Returns: Dict mapping scenario_id -> list of judge outputs (one per trial) """ scenario_data = {} # Check if directory contains Scenario folders directly, or if we need to go one level deeper # (e.g., agent_dir/sre/Scenario-1, agent_dir/finops/Scenario-1, etc.) has_scenarios = any(d.name.startswith("Scenario") for d in agent_dir.iterdir() if d.is_dir()) if not has_scenarios: # Look for subdirectories that might contain scenarios (sre, finops, etc.) subdirs = [d for d in agent_dir.iterdir() if d.is_dir() and not d.name.startswith(".")] if len(subdirs) == 1: # If there's exactly one subdirectory, use it agent_dir = subdirs[0] elif len(subdirs) > 1: # If there are multiple, try to find one with Scenario folders for subdir in subdirs: if any(d.name.startswith("Scenario") for d in subdir.iterdir() if d.is_dir()): agent_dir = subdir break for scenario_dir in agent_dir.iterdir(): if not scenario_dir.is_dir() or not scenario_dir.name.startswith("Scenario"): continue scenario_id = scenario_dir.name trials = [] # Look for trial subdirectories (1, 2, 3, etc.) for trial_dir in sorted(scenario_dir.iterdir()): if not trial_dir.is_dir(): continue judge_file = trial_dir / "judge_output.json" if judge_file.exists(): try: with open(judge_file) as f: judge_data = json.load(f) trials.append(judge_data) except Exception as e: print(f" Warning: Error reading {judge_file}: {e}") if trials: scenario_data[scenario_id] = trials return scenario_data def extract_trial_scores_from_judge_outputs( scenario_data: dict[str, list[dict]], metric: str ) -> dict[str, list[float]]: """ Extract per-trial scores for a given metric from judge outputs. Args: scenario_data: Dict mapping scenario_id -> list of judge outputs metric: The metric name to extract Returns: Dict mapping scenario_id -> list of trial scores """ scenario_trials = {} for scenario_id, trials in scenario_data.items(): scores = [] for trial in trials: flat_scores = trial.get("flat_scores", {}) score = flat_scores.get(metric) # Handle None/null values if score is None: score = 0.0 scores.append(float(score)) if scores: scenario_trials[scenario_id] = scores return scenario_trials def get_runs_stats(scenario_data: dict[str, list], min_runs_required: int) -> tuple[int, int, int, int]: """Get run statistics: (n_scenarios, min_runs, max_runs, n_qualifying).""" if not scenario_data: return 0, 0, 0, 0 run_counts = [len(trials) for trials in scenario_data.values()] n_qualifying = sum(1 for c in run_counts if c >= min_runs_required) return len(scenario_data), min(run_counts), max(run_counts), n_qualifying def filter_scenarios_with_min_runs(scenario_data: dict[str, list], min_runs_required: int) -> dict[str, list]: """Filter to only include scenarios with >= min_runs_required runs.""" return { scenario_id: trials for scenario_id, trials in scenario_data.items() if len(trials) >= min_runs_required } def find_latest_rollout_file(trial_dir: Path) -> Path: """Find the latest rollout file in a trial's sessions directory or session.jsonl.""" # First check for session.jsonl directly in trial_dir (new structure) session_file = trial_dir / "session.jsonl" if session_file.exists(): return session_file # Fall back to sessions/rollout-*.jsonl (old structure) sessions_dir = trial_dir / "sessions" if not sessions_dir.exists(): return None rollout_files = [] for rollout_file in sessions_dir.rglob("rollout-*.jsonl"): rollout_files.append(rollout_file) if not rollout_files: return None # Sort by modification time and return the latest return max(rollout_files, key=lambda p: p.stat().st_mtime) def json_to_filtered_df(path: str) -> pd.DataFrame: """ Load a .json or .jsonl file, keep only rows whose payload.type is in DESIRED_TYPES, select USEFUL_COLS, and return the DataFrame sorted by timestamp ascending. Parameters ---------- path : str Path to the JSON or JSON Lines file. Returns ------- pd.DataFrame Tidied DataFrame ready for analysis/labs. """ DESIRED_TYPES = { "agent_message", "function_call", "function_call_output" } # Union of all “useful” columns USEFUL_COLS = [ "timestamp", "payload.type", "payload.message", "payload.role", "payload.content", "payload.name", "payload.arguments", "payload.call_id", "payload.output", ] path = Path(path) if not path.exists(): raise FileNotFoundError(f"{path} does not exist") # 1. Load the records ----------------------------------------------------- if path.suffix.lower() in {".jsonl", ".ndjson"}: with path.open("r", encoding="utf-8") as f: records = [json.loads(line) for line in f if line.strip()] else: with path.open("r", encoding="utf-8") as f: data = json.load(f) records = data if isinstance(data, list) else [data] # 2. Flatten nested JSON -------------------------------------------------- df = pd.json_normalize(records) # 3. Filter by payload.type ---------------------------------------------- if "payload.type" not in df.columns: raise KeyError("'payload.type' column missing from data") df = df[df["payload.type"].isin(DESIRED_TYPES)].copy() # 4. Ensure all useful columns exist (add empty if missing) -------------- for col in USEFUL_COLS: if col not in df.columns: df[col] = pd.NA # 5. Subset to useful columns only --------------------------------------- df = df[USEFUL_COLS] # 6. Sort by timestamp ---------------------------------------------------- df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") df = df.sort_values("timestamp", ignore_index=True) return df