teamforge / benchmark.py
Your Name
fix: add FastAPI REST endpoints for OpenEnv validator
637f42c
#!/usr/bin/env python3
"""
TeamForge Benchmark Runner
==========================
Evaluates any OpenAI-compatible LLM against all TeamForge tasks and produces
a standardized leaderboard-style report β€” identical protocol every run.
Usage:
python benchmark.py --model llama3-8b-8192
python benchmark.py --model llama3-70b-8192 --model llama3-8b-8192
python benchmark.py --compare # compare all models in results/
Output:
results/<model>/<timestamp>.json β€” raw episode data
results/leaderboard.json β€” aggregate leaderboard
results/leaderboard.md β€” markdown table (paste into README)
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from openai import OpenAI
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn
from rich.table import Table
from rich import box
from environment import TeamForgeEnv
from models import (
Commit, EditFile, GenerateReview, Observation,
PlanStep, RequestIteration, RunLint, RunTests, SelfReflect,
)
from tasks import ALL_TASK_IDS
console = Console()
# ─── Config ──────────────────────────────────────────────────────────────────
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.groq.com/openai/v1")
RESULTS_DIR = Path("results")
RESULTS_DIR.mkdir(exist_ok=True)
BENCHMARK_VERSION = "1.0.0"
TASK_WEIGHTS = {
"easy_bugfix_chunk_list": 0.20,
"medium_refactor_stats": 0.35,
"hard_lru_cache_performance": 0.45,
}
# ─── System prompt (CoT + structured) ────────────────────────────────────────
SYSTEM_PROMPT = """\
You are TeamForge-Agent, an autonomous AI software engineer.
You work through tasks in strict phases:
PLAN β†’ CODE β†’ TEST β†’ LINT β†’ REVIEW β†’ REFLECT β†’ COMMIT
═══ REASONING PROTOCOL ═══
Before each action, reason silently:
OBSERVE: What does the current state tell me?
DIAGNOSE: What is the root cause / what needs doing?
DECIDE: Which action type moves me closer to done?
EXECUTE: Return the action JSON.
═══ ACTION RULES ═══
β€’ NEVER modify files whose path contains "test"
β€’ Always emit β‰₯2 plan_step actions before any edit_file
β€’ After every edit_file, run_tests before committing
β€’ generate_review must mention specific code details (not generic praise)
β€’ Commit message must follow Conventional Commits: fix/feat/refactor/perf(scope): desc
β€’ Return ONLY valid JSON β€” no markdown, no explanation outside the JSON
═══ AVAILABLE ACTIONS ═══
{"type":"plan_step", "step_number":1, "description":"...", "estimated_effort":"low|medium|high"}
{"type":"edit_file", "file_path":"...", "content":"<full file>", "reason":"..."}
{"type":"run_tests", "timeout_seconds":30}
{"type":"run_lint", "fix":false}
{"type":"generate_review", "focus_areas":["correctness","performance"], "review_text":"..."}
{"type":"commit", "message":"fix(scope): ..."}
{"type":"self_reflect", "what_went_well":"...", "what_to_improve":"..."}
{"type":"request_iteration","reason":"...", "target_issues":["..."]}
"""
# ─── Agent ───────────────────────────────────────────────────────────────────
class BenchmarkAgent:
def __init__(self, client: OpenAI, model: str):
self.client = client
self.model = model
self.history: List[Dict] = []
self._consecutive_failures = 0
def reset(self):
self.history = []
self._consecutive_failures = 0
def act(self, obs: Observation) -> Optional[Any]:
self.history.append({"role": "user", "content": self._obs_to_prompt(obs)})
for attempt in range(3):
try:
resp = self.client.chat.completions.create(
model=self.model,
messages=[{"role":"system","content":SYSTEM_PROMPT}, *self.history[-12:]],
temperature=0.15,
max_tokens=1800,
)
raw = resp.choices[0].message.content.strip()
self.history.append({"role":"assistant","content":raw})
action = self._parse(raw)
if action:
self._consecutive_failures = 0
return action
except Exception as e:
time.sleep(1.5 ** attempt)
self._consecutive_failures += 1
return None
def _obs_to_prompt(self, obs: Observation) -> str:
parts = [
f"TASK: {obs.task_id} | STEP {obs.step_number}/{obs.max_steps} | PHASE: {obs.phase.value}",
f"CUMULATIVE_REWARD: {obs.cumulative_reward:.3f}",
f"\n## TASK\n{obs.task_description[:500]}",
]
if obs.last_action_type:
parts.append(f"\n## LAST ACTION: {obs.last_action_type} β†’ {obs.last_action_status.value}")
parts.append(f"```\n{obs.last_action_output[:600]}\n```")
if obs.test_results:
t = obs.test_results
parts.append(f"\n## TESTS: {t.passed} passed / {t.failed} failed / {t.errors} errors")
if t.failed or t.errors:
parts.append(f"```\n{t.output[-600:]}\n```")
if obs.lint_results and obs.lint_results.violations:
parts.append(f"\n## LINT: {obs.lint_results.violations} violations\n```\n{obs.lint_results.output[:400]}\n```")
parts.append("\n## REPO FILES")
for f in obs.repo_files[:8]:
if f.size_bytes < 5000:
parts.append(f"\n### {f.path}\n```python\n{f.content[:1000]}\n```")
if obs.plan:
parts.append(f"\n## PLAN ({len(obs.plan)} steps)")
for s in obs.plan:
parts.append(f" {s.step_number}. [{s.estimated_effort}] {s.description}")
parts.append("\n## NEXT ACTION (JSON only):")
return "\n".join(parts)
def _parse(self, text: str) -> Optional[Any]:
text = text.strip().lstrip("```json").lstrip("```").rstrip("```").strip()
from models import PlanStep, EditFile, RunTests, RunLint, GenerateReview, Commit, SelfReflect, RequestIteration
dispatch = {
"plan_step": PlanStep, "edit_file": EditFile, "run_tests": RunTests,
"run_lint": RunLint, "generate_review": GenerateReview, "commit": Commit,
"self_reflect": SelfReflect, "request_iteration": RequestIteration,
}
try:
data = json.loads(text)
cls = dispatch.get(data.get("type",""))
return cls(**data) if cls else None
except Exception:
# Try to extract JSON from response
import re
m = re.search(r'\{.*\}', text, re.DOTALL)
if m:
try:
data = json.loads(m.group())
cls = dispatch.get(data.get("type",""))
return cls(**data) if cls else None
except Exception:
pass
return None
# ─── Episode runner ───────────────────────────────────────────────────────────
def run_episode(
env: TeamForgeEnv,
agent: BenchmarkAgent,
task_id: str,
) -> Dict[str, Any]:
agent.reset()
obs = env.reset(task_id)
steps_log = []
start_time = time.perf_counter()
while not obs.done:
t0 = time.perf_counter()
action = agent.act(obs)
latency = time.perf_counter() - t0
if action is None:
break
obs = env.step(action)
steps_log.append({
"step": obs.step_number,
"action": obs.last_action_type,
"status": obs.last_action_status.value,
"reward": obs.reward,
"cum_reward": obs.cumulative_reward,
"latency_s": round(latency, 3),
"tests_passed": obs.test_results.passed if obs.test_results else 0,
"tests_failed": obs.test_results.failed if obs.test_results else 0,
"lint_violations": obs.lint_results.violations if obs.lint_results else None,
})
elapsed = time.perf_counter() - start_time
result = env.grade()
return {
"task_id": task_id,
"model": agent.model,
"timestamp": datetime.now(timezone.utc).isoformat(),
"elapsed_s": round(elapsed, 2),
"total_steps": obs.step_number,
"done": obs.done,
"steps_log": steps_log,
"scores": {
"test_pass_rate": result.test_pass_rate,
"lint_score": result.lint_score,
"efficiency_score": result.efficiency_score,
"review_quality": result.review_quality,
"reflection_quality": result.reflection_quality,
"final_score": result.final_score,
"passed": result.passed,
},
}
# ─── Leaderboard ─────────────────────────────────────────────────────────────
def compute_teamforge_score(results_by_task: Dict[str, Dict]) -> float:
"""Weighted composite score across all tasks (0.0–1.0)."""
total = sum(
TASK_WEIGHTS.get(tid, 0) * r["scores"]["final_score"]
for tid, r in results_by_task.items()
)
return round(total, 4)
def build_leaderboard() -> List[Dict]:
"""Scan results/ and aggregate one row per model."""
rows = []
for model_dir in RESULTS_DIR.iterdir():
if not model_dir.is_dir():
continue
model_runs: Dict[str, List] = {}
for f in model_dir.glob("*.json"):
try:
data = json.loads(f.read_text())
tid = data["task_id"]
model_runs.setdefault(tid, []).append(data)
except Exception:
pass
if not model_runs:
continue
# Best run per task
best = {}
for tid, runs in model_runs.items():
best[tid] = max(runs, key=lambda r: r["scores"]["final_score"])
ts = compute_teamforge_score(best)
rows.append({
"model": model_dir.name,
"teamforge_score": ts,
"tasks": {
tid: r["scores"] for tid, r in best.items()
},
"avg_steps": round(
sum(r["total_steps"] for r in best.values()) / max(1, len(best)), 1
),
})
rows.sort(key=lambda r: r["teamforge_score"], reverse=True)
return rows
def save_leaderboard(rows: List[Dict]) -> None:
(RESULTS_DIR / "leaderboard.json").write_text(json.dumps(rows, indent=2))
# Markdown table
lines = [
"## TeamForge Leaderboard\n",
"| Rank | Model | TeamForge Score | Easy | Medium | Hard | Avg Steps |",
"|------|-------|:--------------:|:----:|:------:|:----:|:---------:|",
]
for i, r in enumerate(rows, 1):
easy = r["tasks"].get("easy_bugfix_chunk_list", {}).get("final_score", 0)
medium = r["tasks"].get("medium_refactor_stats", {}).get("final_score", 0)
hard = r["tasks"].get("hard_lru_cache_performance",{}).get("final_score", 0)
lines.append(
f"| {i} | `{r['model']}` | **{r['teamforge_score']:.4f}** "
f"| {easy:.3f} | {medium:.3f} | {hard:.3f} | {r['avg_steps']} |"
)
(RESULTS_DIR / "leaderboard.md").write_text("\n".join(lines))
# ─── Rich display ─────────────────────────────────────────────────────────────
def print_episode_live(task_id: str, model: str, episode: Dict) -> None:
scores = episode["scores"]
log = episode["steps_log"]
# Step table
step_table = Table(box=box.SIMPLE, show_header=True, header_style="bold cyan")
step_table.add_column("Step", justify="right", width=5)
step_table.add_column("Action", width=20)
step_table.add_column("Status", width=9)
step_table.add_column("Reward", justify="right", width=8)
step_table.add_column("Cum Reward", justify="right", width=10)
step_table.add_column("Tests", justify="right", width=9)
for s in log:
status_style = "green" if s["status"] == "success" else "red"
reward_style = "green" if s["reward"] > 0 else ("red" if s["reward"] < -0.05 else "dim")
step_table.add_row(
str(s["step"]),
s["action"],
f"[{status_style}]{s['status']}[/{status_style}]",
f"[{reward_style}]{s['reward']:+.4f}[/{reward_style}]",
f"{s['cum_reward']:+.4f}",
f"{s['tests_passed']}p / {s['tests_failed']}f",
)
# Score panel
score_table = Table(box=box.SIMPLE, show_header=False)
score_table.add_column("Metric", style="bold")
score_table.add_column("Score", justify="right")
score_table.add_column("Bar", width=20)
def bar(v: float) -> str:
filled = int(v * 20)
color = "green" if v >= 0.8 else ("yellow" if v >= 0.5 else "red")
return f"[{color}]{'β–ˆ' * filled}{'β–‘' * (20 - filled)}[/{color}]"
rows_s = [
("test_pass_rate", scores["test_pass_rate"], "Tests Passed"),
("lint_score", scores["lint_score"], "Lint Clean"),
("efficiency", scores["efficiency_score"], "Efficiency"),
("review_quality", scores["review_quality"], "Review Quality"),
("reflection", scores["reflection_quality"],"Reflection"),
]
for _, v, label in rows_s:
score_table.add_row(label, f"{v:.4f}", bar(v))
passed_str = "[bold green]βœ“ PASSED[/bold green]" if scores["passed"] else "[bold red]βœ— FAILED[/bold red]"
console.print(Panel(
step_table,
title=f"[bold]{task_id}[/bold] [{model}]",
subtitle=f"Final: [bold yellow]{scores['final_score']:.4f}[/bold yellow] {passed_str} "
f"Steps: {episode['total_steps']} Time: {episode['elapsed_s']:.1f}s",
border_style="blue",
))
console.print(score_table)
def print_leaderboard(rows: List[Dict]) -> None:
table = Table(
title="[bold]TeamForge Leaderboard[/bold]",
box=box.DOUBLE_EDGE,
show_header=True,
header_style="bold magenta",
border_style="magenta",
)
table.add_column("Rank", justify="center", width=5)
table.add_column("Model", width=28)
table.add_column("TeamForge Score", justify="center", width=16)
table.add_column("Easy (20%)", justify="center", width=11)
table.add_column("Medium (35%)", justify="center", width=12)
table.add_column("Hard (45%)", justify="center", width=11)
table.add_column("Avg Steps", justify="center", width=10)
medals = ["πŸ₯‡","πŸ₯ˆ","πŸ₯‰"]
for i, r in enumerate(rows):
medal = medals[i] if i < 3 else f"#{i+1}"
easy = r["tasks"].get("easy_bugfix_chunk_list", {}).get("final_score", 0)
medium = r["tasks"].get("medium_refactor_stats", {}).get("final_score", 0)
hard = r["tasks"].get("hard_lru_cache_performance",{}).get("final_score", 0)
ts = r["teamforge_score"]
ts_color = "green" if ts >= 0.80 else ("yellow" if ts >= 0.55 else "red")
table.add_row(
medal,
f"[bold]{r['model']}[/bold]",
f"[{ts_color}]{ts:.4f}[/{ts_color}]",
f"{easy:.3f}",
f"{medium:.3f}",
f"{hard:.3f}",
str(r["avg_steps"]),
)
console.print(table)
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="TeamForge Benchmark Runner")
parser.add_argument("--model", action="append", dest="models",
default=None, help="Model name (repeat for multiple)")
parser.add_argument("--task", choices=ALL_TASK_IDS + ["all"], default="all")
parser.add_argument("--compare", action="store_true",
help="Only rebuild + print leaderboard from saved results")
args = parser.parse_args()
if args.compare:
rows = build_leaderboard()
save_leaderboard(rows)
print_leaderboard(rows)
return
models = args.models or ["llama3-8b-8192"]
task_ids = ALL_TASK_IDS if args.task == "all" else [args.task]
console.rule("[bold blue]TeamForge Benchmark[/bold blue]")
console.print(f"[dim]Version {BENCHMARK_VERSION} | {len(models)} model(s) Γ— {len(task_ids)} task(s)[/dim]\n")
client = OpenAI(api_key=GROQ_API_KEY, base_url=API_BASE_URL)
for model in models:
model_dir = RESULTS_DIR / model.replace("/","_")
model_dir.mkdir(exist_ok=True)
agent = BenchmarkAgent(client, model)
env = TeamForgeEnv()
console.print(f"\n[bold cyan]β–Ά Model: {model}[/bold cyan]")
for task_id in task_ids:
with Progress(
SpinnerColumn(),
TextColumn(f"[bold]{task_id}[/bold]"),
BarColumn(),
TimeElapsedColumn(),
console=console,
transient=True,
) as progress:
progress.add_task("running", total=None)
episode = run_episode(env, agent, task_id)
# Save raw result
ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
out_file = model_dir / f"{task_id}_{ts_str}.json"
out_file.write_text(json.dumps(episode, indent=2))
print_episode_live(task_id, model, episode)
env._sandbox.teardown()
# Rebuild leaderboard
rows = build_leaderboard()
save_leaderboard(rows)
console.print()
console.rule("[bold magenta]Leaderboard[/bold magenta]")
print_leaderboard(rows)
console.print(f"\n[dim]Full results β†’ {RESULTS_DIR}/leaderboard.json[/dim]")
console.print(f"[dim]Markdown table β†’ {RESULTS_DIR}/leaderboard.md[/dim]")
if __name__ == "__main__":
main()
# ─── Failure Mode Analyzer ────────────────────────────────────────────────────
def analyze_failures(episodes: list[dict]) -> dict:
"""
Analyze failure modes across episodes.
Returns a dict of {failure_type: count}.
"""
modes = {
"test_file_modified": 0,
"max_steps_exceeded": 0,
"zero_tests_passed": 0,
"lint_never_run": 0,
"no_review_written": 0,
"no_plan_written": 0,
"immediate_edit_no_plan": 0,
}
for ep in episodes:
log = ep.get("steps_log", [])
scores = ep.get("scores", {})
actions = [s["action"] for s in log]
if not ep.get("done"):
modes["max_steps_exceeded"] += 1
if scores.get("test_pass_rate", 1) == 0:
modes["zero_tests_passed"] += 1
if "run_lint" not in actions:
modes["lint_never_run"] += 1
if "generate_review" not in actions:
modes["no_review_written"] += 1
if "plan_step" not in actions:
modes["no_plan_written"] += 1
# Check if first code action was edit without prior plan
first_edit = next((i for i, a in enumerate(actions) if a == "edit_file"), 999)
first_plan = next((i for i, a in enumerate(actions) if a == "plan_step"), 999)
if first_edit < first_plan:
modes["immediate_edit_no_plan"] += 1
# Test file modification = reward < -0.25 on edit_file step
for s in log:
if s["action"] == "edit_file" and s["reward"] < -0.25:
modes["test_file_modified"] += 1
break
return {k: v for k, v in modes.items() if v > 0}
def print_failure_analysis(episodes: list[dict], model: str) -> None:
modes = analyze_failures(episodes)
if not modes:
console.print(f" [green]No failure modes detected for {model}[/green]")
return
table = Table(
title=f"[bold red]Failure Mode Analysis β€” {model}[/bold red]",
box=box.SIMPLE,
show_header=True,
header_style="bold red",
)
table.add_column("Failure Mode", width=30)
table.add_column("Count", justify="right", width=7)
table.add_column("Implication", width=40)
implications = {
"test_file_modified": "Agent tried to cheat β€” severe penalty applied",
"max_steps_exceeded": "Agent ran out of steps without completing",
"zero_tests_passed": "Implementation entirely wrong / syntax error",
"lint_never_run": "Agent skipped code quality check",
"no_review_written": "Agent skipped review phase",
"no_plan_written": "Agent dove straight into coding without planning",
"immediate_edit_no_plan": "First edit came before first plan step",
}
for mode, count in sorted(modes.items(), key=lambda x: -x[1]):
table.add_row(
mode.replace("_", " "),
str(count),
implications.get(mode, ""),
)
console.print(table)