Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| TeamForge Benchmark Runner | |
| ========================== | |
| Evaluates any OpenAI-compatible LLM against all TeamForge tasks and produces | |
| a standardized leaderboard-style report β identical protocol every run. | |
| Usage: | |
| python benchmark.py --model llama3-8b-8192 | |
| python benchmark.py --model llama3-70b-8192 --model llama3-8b-8192 | |
| python benchmark.py --compare # compare all models in results/ | |
| Output: | |
| results/<model>/<timestamp>.json β raw episode data | |
| results/leaderboard.json β aggregate leaderboard | |
| results/leaderboard.md β markdown table (paste into README) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from openai import OpenAI | |
| from rich.console import Console | |
| from rich.live import Live | |
| from rich.panel import Panel | |
| from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn | |
| from rich.table import Table | |
| from rich import box | |
| from environment import TeamForgeEnv | |
| from models import ( | |
| Commit, EditFile, GenerateReview, Observation, | |
| PlanStep, RequestIteration, RunLint, RunTests, SelfReflect, | |
| ) | |
| from tasks import ALL_TASK_IDS | |
| console = Console() | |
| # βββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "") | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.groq.com/openai/v1") | |
| RESULTS_DIR = Path("results") | |
| RESULTS_DIR.mkdir(exist_ok=True) | |
| BENCHMARK_VERSION = "1.0.0" | |
| TASK_WEIGHTS = { | |
| "easy_bugfix_chunk_list": 0.20, | |
| "medium_refactor_stats": 0.35, | |
| "hard_lru_cache_performance": 0.45, | |
| } | |
| # βββ System prompt (CoT + structured) ββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_PROMPT = """\ | |
| You are TeamForge-Agent, an autonomous AI software engineer. | |
| You work through tasks in strict phases: | |
| PLAN β CODE β TEST β LINT β REVIEW β REFLECT β COMMIT | |
| βββ REASONING PROTOCOL βββ | |
| Before each action, reason silently: | |
| OBSERVE: What does the current state tell me? | |
| DIAGNOSE: What is the root cause / what needs doing? | |
| DECIDE: Which action type moves me closer to done? | |
| EXECUTE: Return the action JSON. | |
| βββ ACTION RULES βββ | |
| β’ NEVER modify files whose path contains "test" | |
| β’ Always emit β₯2 plan_step actions before any edit_file | |
| β’ After every edit_file, run_tests before committing | |
| β’ generate_review must mention specific code details (not generic praise) | |
| β’ Commit message must follow Conventional Commits: fix/feat/refactor/perf(scope): desc | |
| β’ Return ONLY valid JSON β no markdown, no explanation outside the JSON | |
| βββ AVAILABLE ACTIONS βββ | |
| {"type":"plan_step", "step_number":1, "description":"...", "estimated_effort":"low|medium|high"} | |
| {"type":"edit_file", "file_path":"...", "content":"<full file>", "reason":"..."} | |
| {"type":"run_tests", "timeout_seconds":30} | |
| {"type":"run_lint", "fix":false} | |
| {"type":"generate_review", "focus_areas":["correctness","performance"], "review_text":"..."} | |
| {"type":"commit", "message":"fix(scope): ..."} | |
| {"type":"self_reflect", "what_went_well":"...", "what_to_improve":"..."} | |
| {"type":"request_iteration","reason":"...", "target_issues":["..."]} | |
| """ | |
| # βββ Agent βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class BenchmarkAgent: | |
| def __init__(self, client: OpenAI, model: str): | |
| self.client = client | |
| self.model = model | |
| self.history: List[Dict] = [] | |
| self._consecutive_failures = 0 | |
| def reset(self): | |
| self.history = [] | |
| self._consecutive_failures = 0 | |
| def act(self, obs: Observation) -> Optional[Any]: | |
| self.history.append({"role": "user", "content": self._obs_to_prompt(obs)}) | |
| for attempt in range(3): | |
| try: | |
| resp = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{"role":"system","content":SYSTEM_PROMPT}, *self.history[-12:]], | |
| temperature=0.15, | |
| max_tokens=1800, | |
| ) | |
| raw = resp.choices[0].message.content.strip() | |
| self.history.append({"role":"assistant","content":raw}) | |
| action = self._parse(raw) | |
| if action: | |
| self._consecutive_failures = 0 | |
| return action | |
| except Exception as e: | |
| time.sleep(1.5 ** attempt) | |
| self._consecutive_failures += 1 | |
| return None | |
| def _obs_to_prompt(self, obs: Observation) -> str: | |
| parts = [ | |
| f"TASK: {obs.task_id} | STEP {obs.step_number}/{obs.max_steps} | PHASE: {obs.phase.value}", | |
| f"CUMULATIVE_REWARD: {obs.cumulative_reward:.3f}", | |
| f"\n## TASK\n{obs.task_description[:500]}", | |
| ] | |
| if obs.last_action_type: | |
| parts.append(f"\n## LAST ACTION: {obs.last_action_type} β {obs.last_action_status.value}") | |
| parts.append(f"```\n{obs.last_action_output[:600]}\n```") | |
| if obs.test_results: | |
| t = obs.test_results | |
| parts.append(f"\n## TESTS: {t.passed} passed / {t.failed} failed / {t.errors} errors") | |
| if t.failed or t.errors: | |
| parts.append(f"```\n{t.output[-600:]}\n```") | |
| if obs.lint_results and obs.lint_results.violations: | |
| parts.append(f"\n## LINT: {obs.lint_results.violations} violations\n```\n{obs.lint_results.output[:400]}\n```") | |
| parts.append("\n## REPO FILES") | |
| for f in obs.repo_files[:8]: | |
| if f.size_bytes < 5000: | |
| parts.append(f"\n### {f.path}\n```python\n{f.content[:1000]}\n```") | |
| if obs.plan: | |
| parts.append(f"\n## PLAN ({len(obs.plan)} steps)") | |
| for s in obs.plan: | |
| parts.append(f" {s.step_number}. [{s.estimated_effort}] {s.description}") | |
| parts.append("\n## NEXT ACTION (JSON only):") | |
| return "\n".join(parts) | |
| def _parse(self, text: str) -> Optional[Any]: | |
| text = text.strip().lstrip("```json").lstrip("```").rstrip("```").strip() | |
| from models import PlanStep, EditFile, RunTests, RunLint, GenerateReview, Commit, SelfReflect, RequestIteration | |
| dispatch = { | |
| "plan_step": PlanStep, "edit_file": EditFile, "run_tests": RunTests, | |
| "run_lint": RunLint, "generate_review": GenerateReview, "commit": Commit, | |
| "self_reflect": SelfReflect, "request_iteration": RequestIteration, | |
| } | |
| try: | |
| data = json.loads(text) | |
| cls = dispatch.get(data.get("type","")) | |
| return cls(**data) if cls else None | |
| except Exception: | |
| # Try to extract JSON from response | |
| import re | |
| m = re.search(r'\{.*\}', text, re.DOTALL) | |
| if m: | |
| try: | |
| data = json.loads(m.group()) | |
| cls = dispatch.get(data.get("type","")) | |
| return cls(**data) if cls else None | |
| except Exception: | |
| pass | |
| return None | |
| # βββ Episode runner βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_episode( | |
| env: TeamForgeEnv, | |
| agent: BenchmarkAgent, | |
| task_id: str, | |
| ) -> Dict[str, Any]: | |
| agent.reset() | |
| obs = env.reset(task_id) | |
| steps_log = [] | |
| start_time = time.perf_counter() | |
| while not obs.done: | |
| t0 = time.perf_counter() | |
| action = agent.act(obs) | |
| latency = time.perf_counter() - t0 | |
| if action is None: | |
| break | |
| obs = env.step(action) | |
| steps_log.append({ | |
| "step": obs.step_number, | |
| "action": obs.last_action_type, | |
| "status": obs.last_action_status.value, | |
| "reward": obs.reward, | |
| "cum_reward": obs.cumulative_reward, | |
| "latency_s": round(latency, 3), | |
| "tests_passed": obs.test_results.passed if obs.test_results else 0, | |
| "tests_failed": obs.test_results.failed if obs.test_results else 0, | |
| "lint_violations": obs.lint_results.violations if obs.lint_results else None, | |
| }) | |
| elapsed = time.perf_counter() - start_time | |
| result = env.grade() | |
| return { | |
| "task_id": task_id, | |
| "model": agent.model, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "elapsed_s": round(elapsed, 2), | |
| "total_steps": obs.step_number, | |
| "done": obs.done, | |
| "steps_log": steps_log, | |
| "scores": { | |
| "test_pass_rate": result.test_pass_rate, | |
| "lint_score": result.lint_score, | |
| "efficiency_score": result.efficiency_score, | |
| "review_quality": result.review_quality, | |
| "reflection_quality": result.reflection_quality, | |
| "final_score": result.final_score, | |
| "passed": result.passed, | |
| }, | |
| } | |
| # βββ Leaderboard βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_teamforge_score(results_by_task: Dict[str, Dict]) -> float: | |
| """Weighted composite score across all tasks (0.0β1.0).""" | |
| total = sum( | |
| TASK_WEIGHTS.get(tid, 0) * r["scores"]["final_score"] | |
| for tid, r in results_by_task.items() | |
| ) | |
| return round(total, 4) | |
| def build_leaderboard() -> List[Dict]: | |
| """Scan results/ and aggregate one row per model.""" | |
| rows = [] | |
| for model_dir in RESULTS_DIR.iterdir(): | |
| if not model_dir.is_dir(): | |
| continue | |
| model_runs: Dict[str, List] = {} | |
| for f in model_dir.glob("*.json"): | |
| try: | |
| data = json.loads(f.read_text()) | |
| tid = data["task_id"] | |
| model_runs.setdefault(tid, []).append(data) | |
| except Exception: | |
| pass | |
| if not model_runs: | |
| continue | |
| # Best run per task | |
| best = {} | |
| for tid, runs in model_runs.items(): | |
| best[tid] = max(runs, key=lambda r: r["scores"]["final_score"]) | |
| ts = compute_teamforge_score(best) | |
| rows.append({ | |
| "model": model_dir.name, | |
| "teamforge_score": ts, | |
| "tasks": { | |
| tid: r["scores"] for tid, r in best.items() | |
| }, | |
| "avg_steps": round( | |
| sum(r["total_steps"] for r in best.values()) / max(1, len(best)), 1 | |
| ), | |
| }) | |
| rows.sort(key=lambda r: r["teamforge_score"], reverse=True) | |
| return rows | |
| def save_leaderboard(rows: List[Dict]) -> None: | |
| (RESULTS_DIR / "leaderboard.json").write_text(json.dumps(rows, indent=2)) | |
| # Markdown table | |
| lines = [ | |
| "## TeamForge Leaderboard\n", | |
| "| Rank | Model | TeamForge Score | Easy | Medium | Hard | Avg Steps |", | |
| "|------|-------|:--------------:|:----:|:------:|:----:|:---------:|", | |
| ] | |
| for i, r in enumerate(rows, 1): | |
| easy = r["tasks"].get("easy_bugfix_chunk_list", {}).get("final_score", 0) | |
| medium = r["tasks"].get("medium_refactor_stats", {}).get("final_score", 0) | |
| hard = r["tasks"].get("hard_lru_cache_performance",{}).get("final_score", 0) | |
| lines.append( | |
| f"| {i} | `{r['model']}` | **{r['teamforge_score']:.4f}** " | |
| f"| {easy:.3f} | {medium:.3f} | {hard:.3f} | {r['avg_steps']} |" | |
| ) | |
| (RESULTS_DIR / "leaderboard.md").write_text("\n".join(lines)) | |
| # βββ Rich display βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_episode_live(task_id: str, model: str, episode: Dict) -> None: | |
| scores = episode["scores"] | |
| log = episode["steps_log"] | |
| # Step table | |
| step_table = Table(box=box.SIMPLE, show_header=True, header_style="bold cyan") | |
| step_table.add_column("Step", justify="right", width=5) | |
| step_table.add_column("Action", width=20) | |
| step_table.add_column("Status", width=9) | |
| step_table.add_column("Reward", justify="right", width=8) | |
| step_table.add_column("Cum Reward", justify="right", width=10) | |
| step_table.add_column("Tests", justify="right", width=9) | |
| for s in log: | |
| status_style = "green" if s["status"] == "success" else "red" | |
| reward_style = "green" if s["reward"] > 0 else ("red" if s["reward"] < -0.05 else "dim") | |
| step_table.add_row( | |
| str(s["step"]), | |
| s["action"], | |
| f"[{status_style}]{s['status']}[/{status_style}]", | |
| f"[{reward_style}]{s['reward']:+.4f}[/{reward_style}]", | |
| f"{s['cum_reward']:+.4f}", | |
| f"{s['tests_passed']}p / {s['tests_failed']}f", | |
| ) | |
| # Score panel | |
| score_table = Table(box=box.SIMPLE, show_header=False) | |
| score_table.add_column("Metric", style="bold") | |
| score_table.add_column("Score", justify="right") | |
| score_table.add_column("Bar", width=20) | |
| def bar(v: float) -> str: | |
| filled = int(v * 20) | |
| color = "green" if v >= 0.8 else ("yellow" if v >= 0.5 else "red") | |
| return f"[{color}]{'β' * filled}{'β' * (20 - filled)}[/{color}]" | |
| rows_s = [ | |
| ("test_pass_rate", scores["test_pass_rate"], "Tests Passed"), | |
| ("lint_score", scores["lint_score"], "Lint Clean"), | |
| ("efficiency", scores["efficiency_score"], "Efficiency"), | |
| ("review_quality", scores["review_quality"], "Review Quality"), | |
| ("reflection", scores["reflection_quality"],"Reflection"), | |
| ] | |
| for _, v, label in rows_s: | |
| score_table.add_row(label, f"{v:.4f}", bar(v)) | |
| passed_str = "[bold green]β PASSED[/bold green]" if scores["passed"] else "[bold red]β FAILED[/bold red]" | |
| console.print(Panel( | |
| step_table, | |
| title=f"[bold]{task_id}[/bold] [{model}]", | |
| subtitle=f"Final: [bold yellow]{scores['final_score']:.4f}[/bold yellow] {passed_str} " | |
| f"Steps: {episode['total_steps']} Time: {episode['elapsed_s']:.1f}s", | |
| border_style="blue", | |
| )) | |
| console.print(score_table) | |
| def print_leaderboard(rows: List[Dict]) -> None: | |
| table = Table( | |
| title="[bold]TeamForge Leaderboard[/bold]", | |
| box=box.DOUBLE_EDGE, | |
| show_header=True, | |
| header_style="bold magenta", | |
| border_style="magenta", | |
| ) | |
| table.add_column("Rank", justify="center", width=5) | |
| table.add_column("Model", width=28) | |
| table.add_column("TeamForge Score", justify="center", width=16) | |
| table.add_column("Easy (20%)", justify="center", width=11) | |
| table.add_column("Medium (35%)", justify="center", width=12) | |
| table.add_column("Hard (45%)", justify="center", width=11) | |
| table.add_column("Avg Steps", justify="center", width=10) | |
| medals = ["π₯","π₯","π₯"] | |
| for i, r in enumerate(rows): | |
| medal = medals[i] if i < 3 else f"#{i+1}" | |
| easy = r["tasks"].get("easy_bugfix_chunk_list", {}).get("final_score", 0) | |
| medium = r["tasks"].get("medium_refactor_stats", {}).get("final_score", 0) | |
| hard = r["tasks"].get("hard_lru_cache_performance",{}).get("final_score", 0) | |
| ts = r["teamforge_score"] | |
| ts_color = "green" if ts >= 0.80 else ("yellow" if ts >= 0.55 else "red") | |
| table.add_row( | |
| medal, | |
| f"[bold]{r['model']}[/bold]", | |
| f"[{ts_color}]{ts:.4f}[/{ts_color}]", | |
| f"{easy:.3f}", | |
| f"{medium:.3f}", | |
| f"{hard:.3f}", | |
| str(r["avg_steps"]), | |
| ) | |
| console.print(table) | |
| # βββ Main βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| parser = argparse.ArgumentParser(description="TeamForge Benchmark Runner") | |
| parser.add_argument("--model", action="append", dest="models", | |
| default=None, help="Model name (repeat for multiple)") | |
| parser.add_argument("--task", choices=ALL_TASK_IDS + ["all"], default="all") | |
| parser.add_argument("--compare", action="store_true", | |
| help="Only rebuild + print leaderboard from saved results") | |
| args = parser.parse_args() | |
| if args.compare: | |
| rows = build_leaderboard() | |
| save_leaderboard(rows) | |
| print_leaderboard(rows) | |
| return | |
| models = args.models or ["llama3-8b-8192"] | |
| task_ids = ALL_TASK_IDS if args.task == "all" else [args.task] | |
| console.rule("[bold blue]TeamForge Benchmark[/bold blue]") | |
| console.print(f"[dim]Version {BENCHMARK_VERSION} | {len(models)} model(s) Γ {len(task_ids)} task(s)[/dim]\n") | |
| client = OpenAI(api_key=GROQ_API_KEY, base_url=API_BASE_URL) | |
| for model in models: | |
| model_dir = RESULTS_DIR / model.replace("/","_") | |
| model_dir.mkdir(exist_ok=True) | |
| agent = BenchmarkAgent(client, model) | |
| env = TeamForgeEnv() | |
| console.print(f"\n[bold cyan]βΆ Model: {model}[/bold cyan]") | |
| for task_id in task_ids: | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn(f"[bold]{task_id}[/bold]"), | |
| BarColumn(), | |
| TimeElapsedColumn(), | |
| console=console, | |
| transient=True, | |
| ) as progress: | |
| progress.add_task("running", total=None) | |
| episode = run_episode(env, agent, task_id) | |
| # Save raw result | |
| ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| out_file = model_dir / f"{task_id}_{ts_str}.json" | |
| out_file.write_text(json.dumps(episode, indent=2)) | |
| print_episode_live(task_id, model, episode) | |
| env._sandbox.teardown() | |
| # Rebuild leaderboard | |
| rows = build_leaderboard() | |
| save_leaderboard(rows) | |
| console.print() | |
| console.rule("[bold magenta]Leaderboard[/bold magenta]") | |
| print_leaderboard(rows) | |
| console.print(f"\n[dim]Full results β {RESULTS_DIR}/leaderboard.json[/dim]") | |
| console.print(f"[dim]Markdown table β {RESULTS_DIR}/leaderboard.md[/dim]") | |
| if __name__ == "__main__": | |
| main() | |
| # βββ Failure Mode Analyzer ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def analyze_failures(episodes: list[dict]) -> dict: | |
| """ | |
| Analyze failure modes across episodes. | |
| Returns a dict of {failure_type: count}. | |
| """ | |
| modes = { | |
| "test_file_modified": 0, | |
| "max_steps_exceeded": 0, | |
| "zero_tests_passed": 0, | |
| "lint_never_run": 0, | |
| "no_review_written": 0, | |
| "no_plan_written": 0, | |
| "immediate_edit_no_plan": 0, | |
| } | |
| for ep in episodes: | |
| log = ep.get("steps_log", []) | |
| scores = ep.get("scores", {}) | |
| actions = [s["action"] for s in log] | |
| if not ep.get("done"): | |
| modes["max_steps_exceeded"] += 1 | |
| if scores.get("test_pass_rate", 1) == 0: | |
| modes["zero_tests_passed"] += 1 | |
| if "run_lint" not in actions: | |
| modes["lint_never_run"] += 1 | |
| if "generate_review" not in actions: | |
| modes["no_review_written"] += 1 | |
| if "plan_step" not in actions: | |
| modes["no_plan_written"] += 1 | |
| # Check if first code action was edit without prior plan | |
| first_edit = next((i for i, a in enumerate(actions) if a == "edit_file"), 999) | |
| first_plan = next((i for i, a in enumerate(actions) if a == "plan_step"), 999) | |
| if first_edit < first_plan: | |
| modes["immediate_edit_no_plan"] += 1 | |
| # Test file modification = reward < -0.25 on edit_file step | |
| for s in log: | |
| if s["action"] == "edit_file" and s["reward"] < -0.25: | |
| modes["test_file_modified"] += 1 | |
| break | |
| return {k: v for k, v in modes.items() if v > 0} | |
| def print_failure_analysis(episodes: list[dict], model: str) -> None: | |
| modes = analyze_failures(episodes) | |
| if not modes: | |
| console.print(f" [green]No failure modes detected for {model}[/green]") | |
| return | |
| table = Table( | |
| title=f"[bold red]Failure Mode Analysis β {model}[/bold red]", | |
| box=box.SIMPLE, | |
| show_header=True, | |
| header_style="bold red", | |
| ) | |
| table.add_column("Failure Mode", width=30) | |
| table.add_column("Count", justify="right", width=7) | |
| table.add_column("Implication", width=40) | |
| implications = { | |
| "test_file_modified": "Agent tried to cheat β severe penalty applied", | |
| "max_steps_exceeded": "Agent ran out of steps without completing", | |
| "zero_tests_passed": "Implementation entirely wrong / syntax error", | |
| "lint_never_run": "Agent skipped code quality check", | |
| "no_review_written": "Agent skipped review phase", | |
| "no_plan_written": "Agent dove straight into coding without planning", | |
| "immediate_edit_no_plan": "First edit came before first plan step", | |
| } | |
| for mode, count in sorted(modes.items(), key=lambda x: -x[1]): | |
| table.add_row( | |
| mode.replace("_", " "), | |
| str(count), | |
| implications.get(mode, ""), | |
| ) | |
| console.print(table) | |