auto-swe-agent-ui / eval /run_eval.py
DevilBits's picture
fix: enforce safe empty bounds for tracking data charts and match dataframe list alignments
6085b61
"""Eval harness for auto-swe-agent. Runs golden test cases and reports pass/fail."""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
from observability.langfuse_client import get_langfuse
AGENT_DIR = Path(__file__).parent.parent
VENV_PYTHON = AGENT_DIR / "venv" / "bin" / "python"
VENV_PYTEST = AGENT_DIR / "venv" / "bin" / "pytest"
PASSWORD_PDF_ISSUE = """\
Bug/Feature: Support password-protected PDFs
Currently docstream raises a raw PyMuPDF error when a password-protected
PDF is passed. It should raise a clean ExtractionError with a helpful message.
Required changes in docstream/core/extractor_v2.py — extract_structured():
1. Add optional password parameter to extract_structured()
2. Detect encrypted PDFs using fitz.Document.is_encrypted
3. Attempt decryption if password is provided via doc.authenticate(password)
4. Raise ExtractionError("PDF is password protected. Pass password= to extract()")
if PDF is encrypted and no password given
5. Raise ExtractionError("Incorrect password for PDF.") if authenticate() returns False
API change: blocks = docstream.extract("protected.pdf", password="secret")
Write a pytest test in tests/test_password_pdf.py that mocks an encrypted PDF
and verifies the correct ExtractionError is raised when no password is provided.
"""
@dataclass
class EvalCase:
case_id: str
issue: str
workspace: str
validation_cmd: str
reset_cmds: list[str] = field(default_factory=list)
@dataclass
class EvalResult:
case_id: str
passed: bool
iterations_used: int
model_used: str
time_taken: float
error: str = ""
tests_passed: Optional[bool] = None
verification_attempts: int = 0
branch_name: Optional[str] = None
commit_hash: Optional[str] = None
total_cost_usd: float = 0.0
total_tokens: int = 0
most_used_model: Optional[str] = None
circuit_events: int = 0
circuits_open: int = 0
semantic_search_calls: int = 0
lgtm_count: int = 0
needs_fix_count: int = 0
CASES = [
EvalCase(
case_id="add-endpoint-bug",
issue="The /add endpoint in main.py returns string concatenation instead of integer addition. Fix it so pytest tests pass.",
workspace=str(Path("~/Desktop/Projects/auto-swe-agent").expanduser()),
validation_cmd=f"{VENV_PYTEST} test_main.py -q",
reset_cmds=[
"WRITE:~/Desktop/Projects/auto-swe-agent/main.py",
],
),
EvalCase(
case_id="docstream-password-pdf",
issue=PASSWORD_PDF_ISSUE,
workspace=str(Path("~/Desktop/Projects/docstream").expanduser()),
validation_cmd=f"{VENV_PYTEST} tests/test_password_pdf.py -q",
reset_cmds=[
"git -C ~/Desktop/Projects/docstream checkout docstream/core/extractor_v2.py docstream/__init__.py",
"rm -f ~/Desktop/Projects/docstream/tests/test_password_pdf.py",
],
),
]
def _run(cmd: str | list, cwd: str | None = None, timeout: int = 30) -> tuple[int, str]:
result = subprocess.run(
cmd,
shell=isinstance(cmd, str),
capture_output=True,
text=True,
timeout=timeout,
cwd=cwd,
)
return result.returncode, (result.stdout + result.stderr).strip()
BUGGY_MAIN = """from fastapi import FastAPI
app = FastAPI()
@app.get("/add")
def add(a: str, b: str):
return {"result": a + b}
"""
def _reset_case(case: EvalCase) -> None:
for cmd in case.reset_cmds:
if cmd.startswith("WRITE:"):
_, path = cmd.split(":", 1)
path = Path(path.strip()).expanduser()
path.write_text(BUGGY_MAIN)
print(f" [RESET] Wrote {path}:")
print(" " + "\n ".join(BUGGY_MAIN.splitlines()))
# Verify it's valid Python
rc, out = _run(
[
str(VENV_PYTHON),
"-c",
f"import ast; ast.parse(open('{path}').read()); print('syntax OK')",
]
)
print(f" [RESET] Syntax check: {out} (exit {rc})")
else:
print(f" [RESET] Running: {cmd}")
code, out = _run(cmd)
print(f" [RESET] Exit {code}: {out[:200] if out else '(no output)'}")
def _pre_validation_check(case: EvalCase) -> None:
"""Print workspace state before running pytest."""
ws = Path(case.workspace)
print(f" [PRE-VALIDATION] Files in {ws}:")
for p in sorted(ws.rglob("*.py")):
rel = p.relative_to(ws)
# Skip venv and __pycache__
if any(part in ("venv", ".venv", "__pycache__") for part in rel.parts):
continue
print(f" {rel} (exists={p.exists()})")
# Specific file checks
if case.case_id == "add-endpoint-bug":
mp = ws / "main.py"
print(f" [PRE-VALIDATION] main.py exists={mp.exists()}")
if mp.exists():
print(" [PRE-VALIDATION] main.py content:")
print(" " + "\n ".join(mp.read_text().splitlines()))
elif case.case_id == "docstream-password-pdf":
tp = ws / "tests" / "test_password_pdf.py"
print(f" [PRE-VALIDATION] test_password_pdf.py exists={tp.exists()}")
def _run_agent(case: EvalCase, timeout: int = 120) -> tuple[int, str]:
result = subprocess.run(
[
str(VENV_PYTHON),
str(AGENT_DIR / "agent.py"),
case.issue,
"--workspace",
case.workspace,
],
capture_output=True,
text=True,
timeout=timeout,
cwd=str(AGENT_DIR),
env={**os.environ},
)
return result.returncode, (result.stdout + result.stderr).strip()
def _extract_langfuse_trace(output: str) -> Optional[str]:
for line in output.splitlines():
if "[LANGFUSE] Trace:" in line:
return line.split("Trace:")[-1].strip().split("/")[-1].strip()
return None
def _extract_meta(output: str) -> tuple:
"""Parse all fields from the [SUMMARY] line printed by agent main()."""
iterations = output.count("--- [NODE] PLANNER")
model = "unknown"
for line in output.splitlines():
if "--- [NODE] PLANNER | model=" in line:
model = line.split("model=")[-1].strip().rstrip("-").strip()
tests_passed: Optional[bool] = None
verification_attempts = 0
branch_name: Optional[str] = None
commit_hash: Optional[str] = None
total_cost_usd: float = 0.0
total_tokens: int = 0
most_used_model: Optional[str] = None
circuit_events: int = 0
circuits_open: int = 0
semantic_search_calls: int = 0
lgtm_count: int = 0
needs_fix_count: int = 0
for line in output.splitlines():
if "[SUMMARY]" in line:
try:
def _get(key: str) -> str:
parts = line.split(f"{key}=")
if len(parts) < 2:
return "0"
return parts[1].split(" |")[0].strip()
tp_str = _get("tests_passed")
tests_passed = (
True if tp_str == "True" else (False if tp_str == "False" else None)
)
verification_attempts = int(_get("verification_attempts"))
bn = _get("branch_name")
branch_name = None if bn == "None" else bn
ch = _get("commit_hash")
commit_hash = None if ch == "None" else ch
total_cost_usd = float(_get("total_cost_usd"))
total_tokens = int(_get("total_tokens"))
mum = _get("most_used_model")
most_used_model = None if mum == "None" else mum
circuit_events = int(_get("circuit_events"))
circuits_open = int(_get("circuits_open"))
semantic_search_calls = int(_get("semantic_search_calls"))
lgtm_count = int(_get("lgtm"))
needs_fix_count = int(_get("needs_fix"))
except (IndexError, ValueError):
pass
break
return (
iterations,
model,
tests_passed,
verification_attempts,
branch_name,
commit_hash,
total_cost_usd,
total_tokens,
most_used_model,
circuit_events,
circuits_open,
semantic_search_calls,
lgtm_count,
needs_fix_count,
)
def run_eval(cases: list[EvalCase] = CASES) -> list[EvalResult]:
results: list[EvalResult] = []
langfuse = get_langfuse()
eval_trace_ids: list[str] = []
for case in cases:
print(f"\n{'='*60}")
print(f"Running: {case.case_id}")
print(f"{'='*60}")
# Create eval-level trace
eval_trace = langfuse.create_trace(
name=f"eval-{case.case_id}",
metadata={"case_id": case.case_id, "issue_preview": case.issue[:100]},
)
eval_trace_id = eval_trace.id if eval_trace is not None else None
if eval_trace_id:
eval_trace_ids.append(eval_trace_id)
print(" Resetting to buggy state...")
_reset_case(case)
start = time.time()
error = ""
agent_output = ""
try:
code, agent_output = _run_agent(case)
if code != 0:
for line in reversed(agent_output.splitlines()):
if line.strip() and not line.startswith(" "):
error = line.strip()
break
error = error or f"Agent exited {code}"
except subprocess.TimeoutExpired:
error = "Agent timed out (120s)"
elapsed = round(time.time() - start, 1)
print(f" Agent output (last 5 lines):")
for line in agent_output.splitlines()[-5:]:
print(f" {line}")
meta = _extract_meta(agent_output)
(
iterations,
model,
agent_tests_passed,
agent_verification_attempts,
branch_name,
commit_hash,
total_cost_usd,
total_tokens,
most_used_model,
circuit_events,
circuits_open,
semantic_search_calls,
lgtm_count,
needs_fix_count,
) = meta[:14]
agent_trace_id = _extract_langfuse_trace(agent_output)
if agent_trace_id:
print(
f" Langfuse agent trace: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}/trace/{agent_trace_id}"
)
passed = False
if not error:
_pre_validation_check(case)
val_code, val_out = _run(
case.validation_cmd, cwd=case.workspace, timeout=30
)
passed = val_code == 0
if not passed:
error = val_out[:200]
print(f" Validation: {'PASS' if passed else 'FAIL'}")
print(f" {val_out[:300]}")
else:
print(f" Skipping validation — agent error: {error}")
# Score the eval trace
if eval_trace_id:
langfuse.score(
trace_id=eval_trace_id,
name="eval_passed",
value=1.0 if passed else 0.0,
comment=f"time={elapsed}s cost=${total_cost_usd:.4f} iterations={iterations}",
)
if error:
langfuse.score(
trace_id=eval_trace_id,
name="eval_error",
value=0.0,
comment=str(error)[:200],
)
results.append(
EvalResult(
case_id=case.case_id,
passed=passed,
iterations_used=iterations,
model_used=model,
time_taken=elapsed,
error=error,
tests_passed=agent_tests_passed,
verification_attempts=agent_verification_attempts,
branch_name=branch_name,
commit_hash=commit_hash,
total_cost_usd=total_cost_usd,
total_tokens=total_tokens,
most_used_model=most_used_model,
circuit_events=circuit_events,
circuits_open=circuits_open,
semantic_search_calls=semantic_search_calls,
lgtm_count=lgtm_count,
needs_fix_count=needs_fix_count,
)
)
if case != cases[-1]:
print(" [RATE LIMIT] Waiting 30s before next case...")
time.sleep(30)
langfuse.flush()
return results
def print_report(results: list[EvalResult]) -> None:
print(f"\n{'='*80}")
print("EVAL REPORT")
print(f"{'='*80}")
print(
f"| {'Case':<28} | {'Result':<6} | {'Iter':>4} | {'Verify':>6} | {'Cost':>8} | "
f"{'Tokens':>7} | {'Time':>6} | {'CE':>3} | {'SS':>3} | {'LGTM':>4} | {'NF':>3} |"
)
print(
f"|{'-'*30}|{'-'*8}|{'-'*6}|{'-'*8}|{'-'*10}|{'-'*9}|{'-'*8}|{'-'*5}|{'-'*5}|{'-'*6}|{'-'*5}|"
)
for r in results:
status = "PASS" if r.passed else "FAIL"
tp = "✓" if r.tests_passed else ("✗" if r.tests_passed is False else "-")
cost_flag = " !" if r.total_cost_usd > 5.0 else ""
ce_flag = " !" if r.circuits_open > 0 else ""
print(
f"| {r.case_id:<28} | {status:<6} | {r.iterations_used:>4} | {tp:>4}/{r.verification_attempts:<1} | "
f"${r.total_cost_usd:>6.4f}{cost_flag} | {r.total_tokens:>7} | {r.time_taken:>5.1f}s | "
f"{r.circuit_events:>3}{ce_flag} | {r.semantic_search_calls:>3} | "
f"{r.lgtm_count:>4} | {r.needs_fix_count:>3} |"
)
passed = sum(r.passed for r in results)
total_cost = sum(r.total_cost_usd for r in results)
total_ce = sum(r.circuit_events for r in results)
total_ss = sum(r.semantic_search_calls for r in results)
total_lgtm = sum(r.lgtm_count for r in results)
total_nf = sum(r.needs_fix_count for r in results)
print(
f"\n{passed}/{len(results)} passed | Total cost: ${total_cost:.4f} | "
f"Total circuit events: {total_ce} | Total semantic searches: {total_ss} | "
f"LGTM: {total_lgtm} | NEEDS_FIX: {total_nf}"
)
# Print circuit summary for any case with events
for r in results:
if r.circuit_events > 0:
print(
f" {r.case_id}: {r.circuit_events} circuit events "
f"({r.circuits_open} circuits still open at end)"
)
def save_results(results: list[EvalResult]) -> Path:
out_dir = Path(__file__).parent
out_dir.mkdir(exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out_path = out_dir / f"results_{ts}.json"
out_path.write_text(json.dumps([r.__dict__ for r in results], indent=2))
print(f"\nResults saved to {out_path}")
return out_path
if __name__ == "__main__":
results = run_eval()
print_report(results)
save_results(results)