ITBench-Lite / analysis_src /extract_tool_failures.py
rohan-arora-ibm's picture
fix: getting the trajectories when the folder hierarchy has domain
7f74217 unverified
#!/usr/bin/env python3
"""
Extract tool call failure data for all 'react with code' agents.
This script reads rollout JSONL files to identify and categorize tool call failures.
"""
import json
import re
import sys
import ast
from pathlib import Path
from dataclasses import dataclass, field
from collections import defaultdict
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from analysis_src.utils import (
get_model_name,
find_react_with_code_dirs,
get_runs_stats,
filter_scenarios_with_min_runs,
find_latest_rollout_file
)
from analysis_src.model_styles import (
get_model_style, get_color_palette, MIN_FONT_SIZE, SINGLE_COLUMN_WIDTH, DOUBLE_COLUMN_WIDTH, _COLORS, PLOT_PARAMETERS
)
# Paths
LEADERBOARD_DIR = PROJECT_ROOT / "ITBench-SRE-Agent" / "ITBench-Trajectories" / "ReAct-Agent-Trajectories"
RESULTS_JSON_DIR = LEADERBOARD_DIR / "results"
OUTPUT_DIR = PROJECT_ROOT / "ITBench-SRE-Agent" / "ITBench-Trajectories" / "output" / "tool_failures"
# Minimum runs per scenario required
MIN_RUNS_PER_SCENARIO = 3
MIN_QUALIFYING_SCENARIOS = 20
# Failure type patterns
FAILURE_PATTERNS = {
'python_syntax': [
r'SyntaxError',
r'IndentationError',
r'TabError',
],
'python_type': [
r'TypeError',
r'AttributeError',
r'ValueError',
r'KeyError',
r'IndexError',
],
'python_name': [
r'NameError',
r'UnboundLocalError',
r'ModuleNotFoundError',
r'ImportError',
],
'file_not_found': [
r'FileNotFoundError',
r'No such file or directory',
r'ENOENT',
r'path does not exist',
],
'permission_denied': [
r'PermissionError',
r'Permission denied',
r'EACCES',
],
'json_parse': [
r'JSONDecodeError',
r'json\.decoder\.JSONDecodeError',
r'Expecting value',
r'Invalid JSON',
],
'timeout': [
r'TimeoutError',
r'timeout',
r'Timed out',
r'deadline exceeded',
],
'memory': [
r'MemoryError',
r'out of memory',
r'OOM',
r'Cannot allocate memory',
],
'connection': [
r'ConnectionError',
r'ConnectionRefusedError',
r'Connection refused',
r'ECONNREFUSED',
],
'shell_command': [
r'command not found',
r'No such command',
r'not recognized as',
],
'assertion': [
r'AssertionError',
],
'runtime': [
r'RuntimeError',
r'Exception',
r'Error:',
],
}
def classify_failure(output: str) -> tuple[str, str]:
"""
Classify a failure based on the output string.
Returns:
(category, specific_error)
"""
for category, patterns in FAILURE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, output, re.IGNORECASE):
# Extract the specific error type
match = re.search(pattern, output, re.IGNORECASE)
return (category, match.group(0) if match else pattern)
# Check for generic traceback
if 'Traceback' in output:
return ('other_python', 'Unknown Python Error')
return ('other', 'Unknown Error')
def extract_tool_calls_from_rollout(rollout_file: Path) -> dict:
"""
Extract all tool calls and their outcomes from a rollout file.
Returns dict with:
- total_tool_calls: int
- failed_tool_calls: int
- failures: list of failure details
- tool_call_counts: dict of tool_name -> count
- tool_failure_counts: dict of tool_name -> failure_count
"""
tool_calls = {} # call_id -> {name, arguments}
total_calls = 0
failed_calls = 0
failures = []
tool_call_counts = defaultdict(int)
tool_failure_counts = defaultdict(int)
try:
with open(rollout_file) as f:
for line in f:
try:
d = json.loads(line)
if d.get('type') != 'response_item':
continue
payload = d.get('payload', {})
payload_type = payload.get('type', '')
if payload_type == 'function_call':
call_id = payload.get('call_id', '')
name = payload.get('name', '')
arguments = payload.get('arguments', '')
tool_calls[call_id] = {
'name': name,
'arguments': arguments,
'timestamp': d.get('timestamp', ''),
}
total_calls += 1
tool_call_counts[name] += 1
elif payload_type == 'function_call_output':
call_id = payload.get('call_id', '')
output = payload.get('output', '')
# Check if this is a failure
is_failure = False
failure_info = None
# Parse the output if it's JSON
try:
output_data = json.loads(output)
if isinstance(output_data, dict):
exit_code = output_data.get('metadata', {}).get('exit_code', 0)
output_text = output_data.get('output', '')
if exit_code != 0:
is_failure = True
category, error = classify_failure(output_text)
failure_info = {
'exit_code': exit_code,
'category': category,
'error': error,
'output_snippet': output_text[:300] if output_text else '',
}
except json.JSONDecodeError:
# Not JSON, check for error patterns in raw output
if 'Error' in output or 'error' in output or 'Traceback' in output:
is_failure = True
category, error = classify_failure(output)
failure_info = {
'exit_code': None,
'category': category,
'error': error,
'output_snippet': output[:300],
}
if is_failure and call_id in tool_calls:
failed_calls += 1
tool_name = tool_calls[call_id]['name']
tool_failure_counts[tool_name] += 1
failures.append({
'tool_name': tool_name,
'arguments': tool_calls[call_id]['arguments'][:200],
'timestamp': tool_calls[call_id]['timestamp'],
**failure_info,
})
except json.JSONDecodeError:
continue
except Exception as e:
print(f" Warning: Error reading {rollout_file}: {e}")
return None
return {
'total_tool_calls': total_calls,
'failed_tool_calls': failed_calls,
'failures': failures,
'tool_call_counts': dict(tool_call_counts),
'tool_failure_counts': dict(tool_failure_counts),
}
def read_agent_stats(agent_dir: Path) -> dict[str, list[dict]]:
"""
Read tool call stats from all scenarios/trials for an agent.
Returns:
Dict mapping scenario_id -> list of stats (one per trial)
"""
scenario_data = {}
# Check if directory contains Scenario folders directly, or if we need to go one level deeper
# (e.g., agent_dir/sre/Scenario-1, agent_dir/finops/Scenario-1, etc.)
has_scenarios = any(d.name.startswith("Scenario") for d in agent_dir.iterdir() if d.is_dir())
if not has_scenarios:
# Look for subdirectories that might contain scenarios (sre, finops, etc.)
subdirs = [d for d in agent_dir.iterdir() if d.is_dir() and not d.name.startswith(".")]
if len(subdirs) == 1:
# If there's exactly one subdirectory, use it
agent_dir = subdirs[0]
elif len(subdirs) > 1:
# If there are multiple, try to find one with Scenario folders
for subdir in subdirs:
if any(d.name.startswith("Scenario") for d in subdir.iterdir() if d.is_dir()):
agent_dir = subdir
break
for scenario_dir in agent_dir.iterdir():
if not scenario_dir.is_dir() or not scenario_dir.name.startswith("Scenario"):
continue
scenario_id = scenario_dir.name
trials = []
for trial_dir in sorted(scenario_dir.iterdir()):
if not trial_dir.is_dir():
continue
rollout_file = find_latest_rollout_file(trial_dir)
if rollout_file:
stats = extract_tool_calls_from_rollout(rollout_file)
if stats:
trials.append(stats)
if trials:
scenario_data[scenario_id] = trials
return scenario_data
def extract_all_data() -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Extract tool failure data for all agents.
Returns:
- summary_df: Aggregated stats per model
- detail_df: Per-trial failure stats
- failures_df: Individual failure details
"""
agent_dirs = find_react_with_code_dirs(LEADERBOARD_DIR)
print(f"Found {len(agent_dirs)} 'react with code' agent directories")
summary_records = []
detail_records = []
failure_records = []
for agent_dir in tqdm(agent_dirs, desc="Processing agents"):
model_name = get_model_name(agent_dir.name)
print(f"\nProcessing: {agent_dir.name}")
scenario_data = read_agent_stats(agent_dir)
n_scenarios, min_runs, max_runs, n_qualifying = get_runs_stats(scenario_data, MIN_RUNS_PER_SCENARIO)
if n_scenarios == 0:
print(f" SKIPPING {model_name}: No rollout data found")
continue
if n_qualifying < MIN_QUALIFYING_SCENARIOS:
print(f" SKIPPING {model_name}: Only {n_qualifying}/{n_scenarios} scenarios have {MIN_RUNS_PER_SCENARIO}+ runs")
continue
# Filter scenarios
scenario_data = filter_scenarios_with_min_runs(scenario_data, MIN_RUNS_PER_SCENARIO)
n_scenarios_filtered = len(scenario_data)
print(f" Processing: {model_name} ({n_scenarios_filtered} scenarios)")
# Aggregate across all scenarios and trials
all_total_calls = []
all_failed_calls = []
all_failure_rates = []
aggregated_tool_counts = defaultdict(int)
aggregated_failure_counts = defaultdict(int)
aggregated_category_counts = defaultdict(int)
for scenario_id, trials in tqdm(scenario_data.items(), desc=f" {model_name} scenarios", leave=False):
for trial_idx, trial in enumerate(trials):
total = trial['total_tool_calls']
failed = trial['failed_tool_calls']
all_total_calls.append(total)
all_failed_calls.append(failed)
all_failure_rates.append(failed / total * 100 if total > 0 else 0)
for tool_name, count in trial['tool_call_counts'].items():
aggregated_tool_counts[tool_name] += count
for tool_name, count in trial['tool_failure_counts'].items():
aggregated_failure_counts[tool_name] += count
# Count failure categories
for failure in trial['failures']:
category = failure.get('category', 'other')
aggregated_category_counts[category] += 1
# Add to failure records
failure_records.append({
'model': model_name,
'scenario': scenario_id,
'trial': trial_idx,
'tool_name': failure.get('tool_name', ''),
'category': category,
'error': failure.get('error', ''),
'exit_code': failure.get('exit_code'),
'output_snippet': failure.get('output_snippet', '')[:100],
})
detail_records.append({
'model': model_name,
'scenario': scenario_id,
'trial': trial_idx,
'total_tool_calls': total,
'failed_tool_calls': failed,
'failure_rate_pct': failed / total * 100 if total > 0 else 0,
})
# Compute per-tool failure rates
tool_failure_rates = {}
for tool_name, total in aggregated_tool_counts.items():
failures = aggregated_failure_counts.get(tool_name, 0)
tool_failure_rates[tool_name] = {
'total': total,
'failures': failures,
'rate': failures / total * 100 if total > 0 else 0
}
summary_records.append({
'model': model_name,
'n_scenarios': n_scenarios_filtered,
'n_trials': len(all_total_calls),
'total_tool_calls': sum(all_total_calls),
'total_failed_calls': sum(all_failed_calls),
'avg_tool_calls_per_trial': np.mean(all_total_calls),
'avg_failed_calls_per_trial': np.mean(all_failed_calls),
'avg_failure_rate_pct': np.mean(all_failure_rates),
'std_failure_rate_pct': np.std(all_failure_rates),
'failure_categories': dict(aggregated_category_counts),
'tool_failure_rates': tool_failure_rates,
})
summary_df = pd.DataFrame(summary_records)
detail_df = pd.DataFrame(detail_records)
failures_df = pd.DataFrame(failure_records)
return summary_df, detail_df, failures_df
def save_data(summary_df: pd.DataFrame, detail_df: pd.DataFrame, failures_df: pd.DataFrame):
"""Save extracted data to CSV files."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
summary_path = OUTPUT_DIR / "tool_failures_summary.csv"
detail_path = OUTPUT_DIR / "tool_failures_detail.csv"
failures_path = OUTPUT_DIR / "tool_failures_individual.csv"
summary_df.to_csv(summary_path, index=False)
detail_df.to_csv(detail_path, index=False)
failures_df.to_csv(failures_path, index=False)
print(f"\nData saved to:")
print(f" - {summary_path}")
print(f" - {detail_path}")
print(f" - {failures_path}")
def print_summary(summary_df: pd.DataFrame):
"""Print summary table."""
print("\n" + "="*100)
print("Tool Call Failure Summary")
print("="*100)
summary_df = summary_df.sort_values("avg_failure_rate_pct", ascending=False)
print(f"\n{'Model':<20} {'Trials':>8} {'Total Calls':>12} {'Failed':>10} {'Fail Rate':>10} {'Top Category':>20}")
print("-" * 85)
for _, row in summary_df.iterrows():
categories = row.get('failure_categories', {})
if categories:
top_cat = max(categories, key=categories.get)
top_cat_count = categories[top_cat]
else:
top_cat = '-'
top_cat_count = 0
print(f"{row['model']:<20} {row['n_trials']:>8} {row['total_tool_calls']:>12} "
f"{row['total_failed_calls']:>10} {row['avg_failure_rate_pct']:>9.2f}% "
f"{top_cat} ({top_cat_count})")
def plot_failure_rate_by_model(summary_df: pd.DataFrame):
"""
Figure 1: Overall failure rate per model (horizontal bar chart).
"""
plt.rcParams.update(PLOT_PARAMETERS)
fig, ax = plt.subplots(figsize=(DOUBLE_COLUMN_WIDTH, 3.0))
data = summary_df.sort_values("avg_failure_rate_pct", ascending=True)
colors = get_color_palette(len(data))
bars = ax.barh(data["model"], data["avg_failure_rate_pct"],
color=colors, edgecolor='black', linewidth=0.5)
# Add error bars for std
ax.errorbar(data["avg_failure_rate_pct"], range(len(data)),
xerr=data["std_failure_rate_pct"], fmt='none',
color='black', capsize=2, linewidth=0.5)
# Add value labels
for i, (bar, val, std) in enumerate(zip(bars, data["avg_failure_rate_pct"], data["std_failure_rate_pct"])):
ax.text(val + std + 0.5, bar.get_y() + bar.get_height()/2,
f'{val:.1f}%', va='center', ha='left', fontsize=MIN_FONT_SIZE - 1)
ax.set_xlabel("Average Failure Rate (%)")
ax.set_xlim(0, data["avg_failure_rate_pct"].max() + data["std_failure_rate_pct"].max() + 5)
plt.title("Tool Call Failure Rate")
plt.tight_layout()
plt.show()
fig.savefig(OUTPUT_DIR / "fig_failure_rate_by_model.png")
plt.close(fig)
print("Saved: fig_failure_rate_by_model.png")
def parse_dict_column(col_str):
"""Parse a dictionary column stored as string."""
if pd.isna(col_str) or col_str == '{}':
return {}
try:
return ast.literal_eval(col_str)
except:
return {}
def plot_failure_categories_stacked(summary_df: pd.DataFrame):
"""
Figure 2: Failure category breakdown per model (stacked bar).
"""
plt.rcParams.update(PLOT_PARAMETERS)
fig, ax = plt.subplots(figsize=(DOUBLE_COLUMN_WIDTH, 3.0))
# Parse failure categories
data = summary_df.copy()
# data['failure_categories'] = data['failure_categories'].apply(parse_dict_column)
print(data['failure_categories'])
# Get all categories and sort by total
all_categories = defaultdict(int)
for cats in data['failure_categories']:
for cat, count in cats.items():
all_categories[cat] += count
CATEGORY_COLORS = {
'python_syntax': '#e41a1c',
'python_type': '#377eb8',
'python_name': '#4daf4a',
'file_not_found': '#984ea3',
'json_parse': '#ff7f00',
'shell_command': '#a65628',
'timeout': '#f781bf',
'memory': '#999999',
'other_python': '#66c2a5',
'other': '#8da0cb',
}
# Sort categories by total count
sorted_cats = sorted(all_categories.keys(), key=lambda x: all_categories[x], reverse=True)[:8]
# Build data for stacked bar
data = data.sort_values('total_failed_calls', ascending=True)
bottom = np.zeros(len(data))
for cat in sorted_cats:
values = [row['failure_categories'].get(cat, 0) for _, row in data.iterrows()]
color = CATEGORY_COLORS.get(cat, '#888888')
ax.barh(data['model'], values, left=bottom,
label=cat.replace('_', ' ').title(), color=color,
edgecolor='white', linewidth=0.3)
bottom += values
ax.set_xlabel("Number of Failed Tool Calls")
ax.legend(loc='lower right', ncol=2, fontsize=MIN_FONT_SIZE - 1,
framealpha=0.9, bbox_to_anchor=(1.0, 0.0))
plt.title("Tool Failure Category Distribution")
plt.tight_layout()
plt.show()
fig.savefig(OUTPUT_DIR / "fig_failure_categories_stacked.png")
plt.close(fig)
print("Saved: fig_failure_categories_stacked.png")
def main():
print("Extracting tool call failure data for 'react with code' agents...")
print(f"Reading from directories: {LEADERBOARD_DIR}")
print(f"Output directory: {OUTPUT_DIR}")
summary_df, detail_df, failures_df = extract_all_data()
if len(summary_df) == 0:
print("No data extracted!")
return
save_data(summary_df, detail_df, failures_df)
print_summary(summary_df)
plot_failure_categories_stacked(summary_df)
if __name__ == "__main__":
main()