#!/usr/bin/env python3 """ Extract tool call failure data for all 'react with code' agents. This script reads rollout JSONL files to identify and categorize tool call failures. """ import json import re import sys import ast from pathlib import Path from dataclasses import dataclass, field from collections import defaultdict import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from tqdm import tqdm # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from analysis_src.utils import ( get_model_name, find_react_with_code_dirs, get_runs_stats, filter_scenarios_with_min_runs, find_latest_rollout_file ) from analysis_src.model_styles import ( get_model_style, get_color_palette, MIN_FONT_SIZE, SINGLE_COLUMN_WIDTH, DOUBLE_COLUMN_WIDTH, _COLORS, PLOT_PARAMETERS ) # Paths LEADERBOARD_DIR = PROJECT_ROOT / "ITBench-SRE-Agent" / "ITBench-Trajectories" / "ReAct-Agent-Trajectories" RESULTS_JSON_DIR = LEADERBOARD_DIR / "results" OUTPUT_DIR = PROJECT_ROOT / "ITBench-SRE-Agent" / "ITBench-Trajectories" / "output" / "tool_failures" # Minimum runs per scenario required MIN_RUNS_PER_SCENARIO = 3 MIN_QUALIFYING_SCENARIOS = 20 # Failure type patterns FAILURE_PATTERNS = { 'python_syntax': [ r'SyntaxError', r'IndentationError', r'TabError', ], 'python_type': [ r'TypeError', r'AttributeError', r'ValueError', r'KeyError', r'IndexError', ], 'python_name': [ r'NameError', r'UnboundLocalError', r'ModuleNotFoundError', r'ImportError', ], 'file_not_found': [ r'FileNotFoundError', r'No such file or directory', r'ENOENT', r'path does not exist', ], 'permission_denied': [ r'PermissionError', r'Permission denied', r'EACCES', ], 'json_parse': [ r'JSONDecodeError', r'json\.decoder\.JSONDecodeError', r'Expecting value', r'Invalid JSON', ], 'timeout': [ r'TimeoutError', r'timeout', r'Timed out', r'deadline exceeded', ], 'memory': [ r'MemoryError', r'out of memory', r'OOM', r'Cannot allocate memory', ], 'connection': [ r'ConnectionError', r'ConnectionRefusedError', r'Connection refused', r'ECONNREFUSED', ], 'shell_command': [ r'command not found', r'No such command', r'not recognized as', ], 'assertion': [ r'AssertionError', ], 'runtime': [ r'RuntimeError', r'Exception', r'Error:', ], } def classify_failure(output: str) -> tuple[str, str]: """ Classify a failure based on the output string. Returns: (category, specific_error) """ for category, patterns in FAILURE_PATTERNS.items(): for pattern in patterns: if re.search(pattern, output, re.IGNORECASE): # Extract the specific error type match = re.search(pattern, output, re.IGNORECASE) return (category, match.group(0) if match else pattern) # Check for generic traceback if 'Traceback' in output: return ('other_python', 'Unknown Python Error') return ('other', 'Unknown Error') def extract_tool_calls_from_rollout(rollout_file: Path) -> dict: """ Extract all tool calls and their outcomes from a rollout file. Returns dict with: - total_tool_calls: int - failed_tool_calls: int - failures: list of failure details - tool_call_counts: dict of tool_name -> count - tool_failure_counts: dict of tool_name -> failure_count """ tool_calls = {} # call_id -> {name, arguments} total_calls = 0 failed_calls = 0 failures = [] tool_call_counts = defaultdict(int) tool_failure_counts = defaultdict(int) try: with open(rollout_file) as f: for line in f: try: d = json.loads(line) if d.get('type') != 'response_item': continue payload = d.get('payload', {}) payload_type = payload.get('type', '') if payload_type == 'function_call': call_id = payload.get('call_id', '') name = payload.get('name', '') arguments = payload.get('arguments', '') tool_calls[call_id] = { 'name': name, 'arguments': arguments, 'timestamp': d.get('timestamp', ''), } total_calls += 1 tool_call_counts[name] += 1 elif payload_type == 'function_call_output': call_id = payload.get('call_id', '') output = payload.get('output', '') # Check if this is a failure is_failure = False failure_info = None # Parse the output if it's JSON try: output_data = json.loads(output) if isinstance(output_data, dict): exit_code = output_data.get('metadata', {}).get('exit_code', 0) output_text = output_data.get('output', '') if exit_code != 0: is_failure = True category, error = classify_failure(output_text) failure_info = { 'exit_code': exit_code, 'category': category, 'error': error, 'output_snippet': output_text[:300] if output_text else '', } except json.JSONDecodeError: # Not JSON, check for error patterns in raw output if 'Error' in output or 'error' in output or 'Traceback' in output: is_failure = True category, error = classify_failure(output) failure_info = { 'exit_code': None, 'category': category, 'error': error, 'output_snippet': output[:300], } if is_failure and call_id in tool_calls: failed_calls += 1 tool_name = tool_calls[call_id]['name'] tool_failure_counts[tool_name] += 1 failures.append({ 'tool_name': tool_name, 'arguments': tool_calls[call_id]['arguments'][:200], 'timestamp': tool_calls[call_id]['timestamp'], **failure_info, }) except json.JSONDecodeError: continue except Exception as e: print(f" Warning: Error reading {rollout_file}: {e}") return None return { 'total_tool_calls': total_calls, 'failed_tool_calls': failed_calls, 'failures': failures, 'tool_call_counts': dict(tool_call_counts), 'tool_failure_counts': dict(tool_failure_counts), } def read_agent_stats(agent_dir: Path) -> dict[str, list[dict]]: """ Read tool call stats from all scenarios/trials for an agent. Returns: Dict mapping scenario_id -> list of stats (one per trial) """ scenario_data = {} # Check if directory contains Scenario folders directly, or if we need to go one level deeper # (e.g., agent_dir/sre/Scenario-1, agent_dir/finops/Scenario-1, etc.) has_scenarios = any(d.name.startswith("Scenario") for d in agent_dir.iterdir() if d.is_dir()) if not has_scenarios: # Look for subdirectories that might contain scenarios (sre, finops, etc.) subdirs = [d for d in agent_dir.iterdir() if d.is_dir() and not d.name.startswith(".")] if len(subdirs) == 1: # If there's exactly one subdirectory, use it agent_dir = subdirs[0] elif len(subdirs) > 1: # If there are multiple, try to find one with Scenario folders for subdir in subdirs: if any(d.name.startswith("Scenario") for d in subdir.iterdir() if d.is_dir()): agent_dir = subdir break for scenario_dir in agent_dir.iterdir(): if not scenario_dir.is_dir() or not scenario_dir.name.startswith("Scenario"): continue scenario_id = scenario_dir.name trials = [] for trial_dir in sorted(scenario_dir.iterdir()): if not trial_dir.is_dir(): continue rollout_file = find_latest_rollout_file(trial_dir) if rollout_file: stats = extract_tool_calls_from_rollout(rollout_file) if stats: trials.append(stats) if trials: scenario_data[scenario_id] = trials return scenario_data def extract_all_data() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Extract tool failure data for all agents. Returns: - summary_df: Aggregated stats per model - detail_df: Per-trial failure stats - failures_df: Individual failure details """ agent_dirs = find_react_with_code_dirs(LEADERBOARD_DIR) print(f"Found {len(agent_dirs)} 'react with code' agent directories") summary_records = [] detail_records = [] failure_records = [] for agent_dir in tqdm(agent_dirs, desc="Processing agents"): model_name = get_model_name(agent_dir.name) print(f"\nProcessing: {agent_dir.name}") scenario_data = read_agent_stats(agent_dir) n_scenarios, min_runs, max_runs, n_qualifying = get_runs_stats(scenario_data, MIN_RUNS_PER_SCENARIO) if n_scenarios == 0: print(f" SKIPPING {model_name}: No rollout data found") continue if n_qualifying < MIN_QUALIFYING_SCENARIOS: print(f" SKIPPING {model_name}: Only {n_qualifying}/{n_scenarios} scenarios have {MIN_RUNS_PER_SCENARIO}+ runs") continue # Filter scenarios scenario_data = filter_scenarios_with_min_runs(scenario_data, MIN_RUNS_PER_SCENARIO) n_scenarios_filtered = len(scenario_data) print(f" Processing: {model_name} ({n_scenarios_filtered} scenarios)") # Aggregate across all scenarios and trials all_total_calls = [] all_failed_calls = [] all_failure_rates = [] aggregated_tool_counts = defaultdict(int) aggregated_failure_counts = defaultdict(int) aggregated_category_counts = defaultdict(int) for scenario_id, trials in tqdm(scenario_data.items(), desc=f" {model_name} scenarios", leave=False): for trial_idx, trial in enumerate(trials): total = trial['total_tool_calls'] failed = trial['failed_tool_calls'] all_total_calls.append(total) all_failed_calls.append(failed) all_failure_rates.append(failed / total * 100 if total > 0 else 0) for tool_name, count in trial['tool_call_counts'].items(): aggregated_tool_counts[tool_name] += count for tool_name, count in trial['tool_failure_counts'].items(): aggregated_failure_counts[tool_name] += count # Count failure categories for failure in trial['failures']: category = failure.get('category', 'other') aggregated_category_counts[category] += 1 # Add to failure records failure_records.append({ 'model': model_name, 'scenario': scenario_id, 'trial': trial_idx, 'tool_name': failure.get('tool_name', ''), 'category': category, 'error': failure.get('error', ''), 'exit_code': failure.get('exit_code'), 'output_snippet': failure.get('output_snippet', '')[:100], }) detail_records.append({ 'model': model_name, 'scenario': scenario_id, 'trial': trial_idx, 'total_tool_calls': total, 'failed_tool_calls': failed, 'failure_rate_pct': failed / total * 100 if total > 0 else 0, }) # Compute per-tool failure rates tool_failure_rates = {} for tool_name, total in aggregated_tool_counts.items(): failures = aggregated_failure_counts.get(tool_name, 0) tool_failure_rates[tool_name] = { 'total': total, 'failures': failures, 'rate': failures / total * 100 if total > 0 else 0 } summary_records.append({ 'model': model_name, 'n_scenarios': n_scenarios_filtered, 'n_trials': len(all_total_calls), 'total_tool_calls': sum(all_total_calls), 'total_failed_calls': sum(all_failed_calls), 'avg_tool_calls_per_trial': np.mean(all_total_calls), 'avg_failed_calls_per_trial': np.mean(all_failed_calls), 'avg_failure_rate_pct': np.mean(all_failure_rates), 'std_failure_rate_pct': np.std(all_failure_rates), 'failure_categories': dict(aggregated_category_counts), 'tool_failure_rates': tool_failure_rates, }) summary_df = pd.DataFrame(summary_records) detail_df = pd.DataFrame(detail_records) failures_df = pd.DataFrame(failure_records) return summary_df, detail_df, failures_df def save_data(summary_df: pd.DataFrame, detail_df: pd.DataFrame, failures_df: pd.DataFrame): """Save extracted data to CSV files.""" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) summary_path = OUTPUT_DIR / "tool_failures_summary.csv" detail_path = OUTPUT_DIR / "tool_failures_detail.csv" failures_path = OUTPUT_DIR / "tool_failures_individual.csv" summary_df.to_csv(summary_path, index=False) detail_df.to_csv(detail_path, index=False) failures_df.to_csv(failures_path, index=False) print(f"\nData saved to:") print(f" - {summary_path}") print(f" - {detail_path}") print(f" - {failures_path}") def print_summary(summary_df: pd.DataFrame): """Print summary table.""" print("\n" + "="*100) print("Tool Call Failure Summary") print("="*100) summary_df = summary_df.sort_values("avg_failure_rate_pct", ascending=False) print(f"\n{'Model':<20} {'Trials':>8} {'Total Calls':>12} {'Failed':>10} {'Fail Rate':>10} {'Top Category':>20}") print("-" * 85) for _, row in summary_df.iterrows(): categories = row.get('failure_categories', {}) if categories: top_cat = max(categories, key=categories.get) top_cat_count = categories[top_cat] else: top_cat = '-' top_cat_count = 0 print(f"{row['model']:<20} {row['n_trials']:>8} {row['total_tool_calls']:>12} " f"{row['total_failed_calls']:>10} {row['avg_failure_rate_pct']:>9.2f}% " f"{top_cat} ({top_cat_count})") def plot_failure_rate_by_model(summary_df: pd.DataFrame): """ Figure 1: Overall failure rate per model (horizontal bar chart). """ plt.rcParams.update(PLOT_PARAMETERS) fig, ax = plt.subplots(figsize=(DOUBLE_COLUMN_WIDTH, 3.0)) data = summary_df.sort_values("avg_failure_rate_pct", ascending=True) colors = get_color_palette(len(data)) bars = ax.barh(data["model"], data["avg_failure_rate_pct"], color=colors, edgecolor='black', linewidth=0.5) # Add error bars for std ax.errorbar(data["avg_failure_rate_pct"], range(len(data)), xerr=data["std_failure_rate_pct"], fmt='none', color='black', capsize=2, linewidth=0.5) # Add value labels for i, (bar, val, std) in enumerate(zip(bars, data["avg_failure_rate_pct"], data["std_failure_rate_pct"])): ax.text(val + std + 0.5, bar.get_y() + bar.get_height()/2, f'{val:.1f}%', va='center', ha='left', fontsize=MIN_FONT_SIZE - 1) ax.set_xlabel("Average Failure Rate (%)") ax.set_xlim(0, data["avg_failure_rate_pct"].max() + data["std_failure_rate_pct"].max() + 5) plt.title("Tool Call Failure Rate") plt.tight_layout() plt.show() fig.savefig(OUTPUT_DIR / "fig_failure_rate_by_model.png") plt.close(fig) print("Saved: fig_failure_rate_by_model.png") def parse_dict_column(col_str): """Parse a dictionary column stored as string.""" if pd.isna(col_str) or col_str == '{}': return {} try: return ast.literal_eval(col_str) except: return {} def plot_failure_categories_stacked(summary_df: pd.DataFrame): """ Figure 2: Failure category breakdown per model (stacked bar). """ plt.rcParams.update(PLOT_PARAMETERS) fig, ax = plt.subplots(figsize=(DOUBLE_COLUMN_WIDTH, 3.0)) # Parse failure categories data = summary_df.copy() # data['failure_categories'] = data['failure_categories'].apply(parse_dict_column) print(data['failure_categories']) # Get all categories and sort by total all_categories = defaultdict(int) for cats in data['failure_categories']: for cat, count in cats.items(): all_categories[cat] += count CATEGORY_COLORS = { 'python_syntax': '#e41a1c', 'python_type': '#377eb8', 'python_name': '#4daf4a', 'file_not_found': '#984ea3', 'json_parse': '#ff7f00', 'shell_command': '#a65628', 'timeout': '#f781bf', 'memory': '#999999', 'other_python': '#66c2a5', 'other': '#8da0cb', } # Sort categories by total count sorted_cats = sorted(all_categories.keys(), key=lambda x: all_categories[x], reverse=True)[:8] # Build data for stacked bar data = data.sort_values('total_failed_calls', ascending=True) bottom = np.zeros(len(data)) for cat in sorted_cats: values = [row['failure_categories'].get(cat, 0) for _, row in data.iterrows()] color = CATEGORY_COLORS.get(cat, '#888888') ax.barh(data['model'], values, left=bottom, label=cat.replace('_', ' ').title(), color=color, edgecolor='white', linewidth=0.3) bottom += values ax.set_xlabel("Number of Failed Tool Calls") ax.legend(loc='lower right', ncol=2, fontsize=MIN_FONT_SIZE - 1, framealpha=0.9, bbox_to_anchor=(1.0, 0.0)) plt.title("Tool Failure Category Distribution") plt.tight_layout() plt.show() fig.savefig(OUTPUT_DIR / "fig_failure_categories_stacked.png") plt.close(fig) print("Saved: fig_failure_categories_stacked.png") def main(): print("Extracting tool call failure data for 'react with code' agents...") print(f"Reading from directories: {LEADERBOARD_DIR}") print(f"Output directory: {OUTPUT_DIR}") summary_df, detail_df, failures_df = extract_all_data() if len(summary_df) == 0: print("No data extracted!") return save_data(summary_df, detail_df, failures_df) print_summary(summary_df) plot_failure_categories_stacked(summary_df) if __name__ == "__main__": main()