rohan-arora-ibm's picture
fix: path to session.jsonl
1db0756 unverified
import json
from pathlib import Path
# Model display names (short for figures)
# Follows ArtificialAnalysis.ai naming conventions
MODEL_DISPLAY_NAMES = {
# OpenAI / Azure
"Azure_gpt-5.1-2025-11-13": "GPT-5.1",
"Azure_gpt-5.1-chat-2025-11-13": "GPT-5.1",
"Azure_o4-mini": "o4-mini",
"Azure_gpt-4o": "GPT-4o",
"openai_gpt-oss-120b": "GPT-OSS-120B",
"openai_gpt-oss-20b": "GPT-OSS-20B",
# Google / GCP
"GCP_gemini-2.5-pro": "Gemini 2.5 Pro",
"gemini-2.5-pro": "Gemini 2.5 Pro",
"gcp_gemini-3-pro-preview": "Gemini 3 Pro",
"gemini-3-pro-preview": "Gemini 3 Pro",
"gemini-3-flash-preview": "Gemini 3 Flash",
"google_gemini-3-flash-preview": "Gemini 3 Flash",
# Moonshot AI
"moonshotai_kimi-k2-thinking": "Kimi K2",
"kimi-k2-thinking": "Kimi K2",
# Anthropic / AWS
"aws_claude-opus-4-5": "Claude Opus 4.5",
# Mistral AI
"mistralai_mistral-large-2512": "Mistral Large",
# Alibaba / Qwen
"qwen_qwen3-vl-32b-instruct": "Qwen3-VL-32B",
# ServiceNow
"ServiceNow-AI_Apriel-1.6-15b-Thinker": "Apriel-1.6-15B",
}
def get_model_name(dirname: str) -> str:
"""Extract model name from directory name."""
name = dirname.replace("react with code_", "").replace("_07ccdb1", "")
return MODEL_DISPLAY_NAMES.get(name, name)
def find_react_with_code_dirs(leaderboard_dir: Path) -> list[Path]:
"""Find all agent directories (excluding hidden and backup directories)."""
dirs = []
for d in leaderboard_dir.iterdir():
if d.is_dir() and not d.name.startswith(".") and not d.name.startswith("backup_"):
dirs.append(d)
return sorted(dirs)
def read_judge_outputs_from_dir(agent_dir: Path) -> dict[str, list[dict]]:
"""
Read all judge_output.json files from an agent directory.
Returns:
Dict mapping scenario_id -> list of judge outputs (one per trial)
"""
scenario_data = {}
# Check if directory contains Scenario folders directly, or if we need to go one level deeper
# (e.g., agent_dir/sre/Scenario-1, agent_dir/finops/Scenario-1, etc.)
has_scenarios = any(d.name.startswith("Scenario") for d in agent_dir.iterdir() if d.is_dir())
if not has_scenarios:
# Look for subdirectories that might contain scenarios (sre, finops, etc.)
subdirs = [d for d in agent_dir.iterdir() if d.is_dir() and not d.name.startswith(".")]
if len(subdirs) == 1:
# If there's exactly one subdirectory, use it
agent_dir = subdirs[0]
elif len(subdirs) > 1:
# If there are multiple, try to find one with Scenario folders
for subdir in subdirs:
if any(d.name.startswith("Scenario") for d in subdir.iterdir() if d.is_dir()):
agent_dir = subdir
break
for scenario_dir in agent_dir.iterdir():
if not scenario_dir.is_dir() or not scenario_dir.name.startswith("Scenario"):
continue
scenario_id = scenario_dir.name
trials = []
# Look for trial subdirectories (1, 2, 3, etc.)
for trial_dir in sorted(scenario_dir.iterdir()):
if not trial_dir.is_dir():
continue
judge_file = trial_dir / "judge_output.json"
if judge_file.exists():
try:
with open(judge_file) as f:
judge_data = json.load(f)
trials.append(judge_data)
except Exception as e:
print(f" Warning: Error reading {judge_file}: {e}")
if trials:
scenario_data[scenario_id] = trials
return scenario_data
def extract_trial_scores_from_judge_outputs(
scenario_data: dict[str, list[dict]],
metric: str
) -> dict[str, list[float]]:
"""
Extract per-trial scores for a given metric from judge outputs.
Args:
scenario_data: Dict mapping scenario_id -> list of judge outputs
metric: The metric name to extract
Returns:
Dict mapping scenario_id -> list of trial scores
"""
scenario_trials = {}
for scenario_id, trials in scenario_data.items():
scores = []
for trial in trials:
flat_scores = trial.get("flat_scores", {})
score = flat_scores.get(metric)
# Handle None/null values
if score is None:
score = 0.0
scores.append(float(score))
if scores:
scenario_trials[scenario_id] = scores
return scenario_trials
def get_runs_stats(scenario_data: dict[str, list], min_runs_required: int) -> tuple[int, int, int, int]:
"""Get run statistics: (n_scenarios, min_runs, max_runs, n_qualifying)."""
if not scenario_data:
return 0, 0, 0, 0
run_counts = [len(trials) for trials in scenario_data.values()]
n_qualifying = sum(1 for c in run_counts if c >= min_runs_required)
return len(scenario_data), min(run_counts), max(run_counts), n_qualifying
def filter_scenarios_with_min_runs(scenario_data: dict[str, list], min_runs_required: int) -> dict[str, list]:
"""Filter to only include scenarios with >= min_runs_required runs."""
return {
scenario_id: trials
for scenario_id, trials in scenario_data.items()
if len(trials) >= min_runs_required
}
def find_latest_rollout_file(trial_dir: Path) -> Path:
"""Find the latest rollout file in a trial's sessions directory or session.jsonl."""
# First check for session.jsonl directly in trial_dir (new structure)
session_file = trial_dir / "session.jsonl"
if session_file.exists():
return session_file
# Fall back to sessions/rollout-*.jsonl (old structure)
sessions_dir = trial_dir / "sessions"
if not sessions_dir.exists():
return None
rollout_files = []
for rollout_file in sessions_dir.rglob("rollout-*.jsonl"):
rollout_files.append(rollout_file)
if not rollout_files:
return None
# Sort by modification time and return the latest
return max(rollout_files, key=lambda p: p.stat().st_mtime)