teamforge / grader.py
Your Name
fix(OpenEnv): implement system-wide [0.1, 0.9] boundary scrub for Phase 2 compliance
efa2d2a
"""
TeamForge Grader
Deterministic scoring of a completed episode.
Final score formula (0.0–1.0):
score = 0.40 * test_pass_rate
+ 0.25 * lint_score
+ 0.20 * efficiency_score
+ 0.10 * review_quality
+ 0.05 * reflection_quality
Anti-exploit measures:
- Test tampering detection (trivial pass/assert True bodies)
- Implementation existence check (no empty stubs)
- Minimum review length requirement
- Lint-fix-only exploit detection
"""
from __future__ import annotations
import ast
import math
import re
import subprocess
import sys
from pathlib import Path
from typing import List, Optional
from models import EpisodeResult, ReviewArtifact, ReflectionArtifact
# ─────────────────────────────────────────────
# SCORING CONFIG
# ─────────────────────────────────────────────
_SCORE_MIN = 0.10 # deep interior to avoid all boundary issues
_SCORE_MAX = 0.90 # deep interior to avoid all boundary issues
def _clamp(score: float) -> float:
"""Ensure score is strictly within the open interval (0, 1)."""
return round(max(_SCORE_MIN, min(_SCORE_MAX, score)), 4)
# ─────────────────────────────────────────────
# ANTI-EXPLOIT GUARDS
# ─────────────────────────────────────────────
def _detect_test_tampering(repo_path: str) -> bool:
"""
Detect trivially-passing test rewrites.
Flags: single-statement test with `pass`, `return`, or `assert True`.
"""
for test_file in Path(repo_path).rglob("test_*.py"):
try:
src = test_file.read_text()
tree = ast.parse(src)
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name.startswith("test_"):
body = node.body
if len(body) == 1:
stmt = body[0]
if isinstance(stmt, (ast.Pass, ast.Return)):
return True
if isinstance(stmt, ast.Assert):
val = stmt.test
if isinstance(val, ast.Constant) and val.value is True:
return True
except Exception:
pass
return False
def _implementation_exists(repo_path: str) -> bool:
"""Verify at least some non-test code was written (>5 non-blank lines)."""
total = 0
for f in Path(repo_path).rglob("*.py"):
if "test" in f.name or "__pycache__" in str(f):
continue
try:
lines = [l for l in f.read_text().splitlines() if l.strip()]
total += len(lines)
except Exception:
pass
return total >= 5
# ─────────────────────────────────────────────
# SUB-SCORERS
# ─────────────────────────────────────────────
def score_tests(repo_path: str, timeout: int = 60) -> tuple[float, str]:
"""Run pytest and return (pass_rate 0-1, output)."""
result = subprocess.run(
[sys.executable, "-m", "pytest", "--tb=short", "-q", "--no-header"],
cwd=repo_path,
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
passed = failed = errors = 0
m_p = re.search(r"(\d+) passed", output)
m_f = re.search(r"(\d+) failed", output)
m_e = re.search(r"(\d+) error", output)
if m_p: passed = int(m_p.group(1))
if m_f: failed = int(m_f.group(1))
if m_e: errors = int(m_e.group(1))
total = passed + failed + errors
if total == 0:
return _SCORE_MIN, output
pass_rate = passed / total
return _clamp(pass_rate), output
def score_lint(repo_path: str) -> tuple[float, str]:
"""Run ruff and return (lint_score 0-1, output)."""
result = subprocess.run(
[sys.executable, "-m", "ruff", "check", "."],
cwd=repo_path,
capture_output=True,
text=True,
)
output = result.stdout + result.stderr
if result.returncode == 0:
return _SCORE_MAX, "No lint violations."
violations = len([
ln for ln in output.splitlines()
if re.match(r".+:\d+:\d+:", ln)
])
# Strictly interior [SCORE_MIN, SCORE_MAX]
raw_score = 1.0 - violations * 0.07
return _clamp(raw_score), output
def score_review_quality(
reviews: List[ReviewArtifact],
required_keywords: List[str],
) -> float:
"""Keyword-based review quality with minimum length requirement."""
if not reviews:
return _SCORE_MIN
combined = " ".join(r.text.lower() for r in reviews)
# Anti-exploit: minimum meaningful length
if len(combined.strip()) < 40:
return _SCORE_MIN + 0.05
# Keyword coverage
if not required_keywords:
kw_score = 0.5
else:
found = sum(1 for kw in required_keywords if kw.lower() in combined)
kw_score = found / len(required_keywords)
# Length bonus (up to 0.2 for thorough reviews)
avg_len = sum(len(r.text) for r in reviews) / len(reviews)
length_bonus = min(0.2, avg_len / 600)
# Specificity bonus: mentions actual code identifiers
code_words = re.findall(r'\b[a-z_]{3,}\(\)', combined)
specificity = min(0.1, len(set(code_words)) * 0.025)
return _clamp(kw_score * 0.7 + length_bonus + specificity)
def score_reflection_quality(reflections: List[ReflectionArtifact]) -> float:
"""Score reflections on depth and actionability."""
if not reflections:
return _SCORE_MIN
total = 0.0
for ref in reflections:
depth = 0.0
if len(ref.what_went_well.strip()) > 40:
depth += 0.5
if len(ref.what_to_improve.strip()) > 40:
depth += 0.5
# Bonus if adjusted_plan provided
if ref.adjusted_plan and len(ref.adjusted_plan.strip()) > 20:
depth = min(0.9, depth + 0.2)
total += depth
# Strictly (0, 1) - Safer interior
return _clamp(total / max(1, len(reflections)))
def score_efficiency(total_steps: int, max_steps: int) -> float:
"""Reward solving in fewer steps with smooth decay curve."""
ratio = total_steps / max_steps
# Strictly (0, 1) - Safer interior
return _clamp(math.exp(-2.0 * max(0, ratio - 0.25)))
# ─────────────────────────────────────────────
# MAIN GRADER
# ─────────────────────────────────────────────
def grade_episode(
repo_path: str,
task_id: str,
total_steps: int,
max_steps: int,
reviews: List[ReviewArtifact],
reflections: List[ReflectionArtifact],
required_keywords: Optional[List[str]] = None,
) -> EpisodeResult:
"""
Run full deterministic grading pipeline.
Returns EpisodeResult with final_score in [0.0, 1.0].
"""
log: List[str] = []
required_keywords = required_keywords or []
# ── Anti-exploit checks ──
if _detect_test_tampering(repo_path):
log.append("[GRADER] ⚠ TEST TAMPERING DETECTED β€” score zeroed")
return EpisodeResult(
task_id=task_id, total_steps=total_steps,
final_score=_SCORE_MIN, passed=False,
log=log + ["Test files were trivially rewritten to force passes."],
)
if not _implementation_exists(repo_path):
log.append("[GRADER] ⚠ NO IMPLEMENTATION FOUND β€” score zeroed")
return EpisodeResult(
task_id=task_id, total_steps=total_steps,
final_score=_SCORE_MIN, passed=False,
log=log + ["No non-test code was written."],
)
# ── Tests ──
test_pass_rate, test_output = score_tests(repo_path)
log.append(f"[GRADER] test_pass_rate={test_pass_rate:.3f}")
log.append(f"[GRADER] test_output=\n{test_output[:800]}")
# ── Lint ──
lint_score, lint_output = score_lint(repo_path)
log.append(f"[GRADER] lint_score={lint_score:.3f}")
# ── Efficiency ──
efficiency = score_efficiency(total_steps, max_steps)
log.append(f"[GRADER] efficiency={efficiency:.3f} ({total_steps}/{max_steps} steps)")
# ── Review quality ──
review_q = score_review_quality(reviews, required_keywords)
log.append(f"[GRADER] review_quality={review_q:.3f}")
# ── Reflection quality ──
reflect_q = score_reflection_quality(reflections)
log.append(f"[GRADER] reflection_quality={reflect_q:.3f}")
# ── Final weighted score ──
final = (
0.40 * test_pass_rate
+ 0.25 * lint_score
+ 0.20 * efficiency
+ 0.10 * review_q
+ 0.05 * reflect_q
)
# Strictly (0, 1) interior range to satisfy Phase 2 validator
final = _clamp(final)
log.append(f"[GRADER] FINAL_SCORE={final:.4f}")
return EpisodeResult(
task_id=task_id,
total_steps=total_steps,
test_pass_rate=test_pass_rate,
lint_score=lint_score,
efficiency_score=efficiency,
review_quality=review_q,
reflection_quality=reflect_q,
final_score=final,
passed=test_pass_rate >= 0.9 and lint_score >= 0.7,
log=log,
)
def grade_task(repo_path: str, **kwargs) -> float:
"""
OpenEnv standard grader bridge – entry point from YAML.
Returns ONLY a float strictly between 0 and 1.
"""
import json
import os
from typing import List
from pydantic import TypeAdapter
metadata_path = os.path.join(repo_path, "grading_metadata.json")
# Default fallback values for out-of-band grading
task_id = "unknown"
total_steps = 1
max_steps = 20
reviews = []
reflections = []
required_keywords = []
if os.path.exists(metadata_path):
try:
with open(metadata_path, "r") as f:
meta = json.load(f)
task_id = meta.get("task_id", task_id)
total_steps = meta.get("total_steps", total_steps)
max_steps = meta.get("max_steps", max_steps)
# Use TypeAdapter for robust Pydantic deserialization
from models import ReviewArtifact, ReflectionArtifact
reviews = TypeAdapter(List[ReviewArtifact]).validate_python(meta.get("reviews", []))
reflections = TypeAdapter(List[ReflectionArtifact]).validate_python(meta.get("reflections", []))
required_keywords = meta.get("required_keywords", [])
except Exception:
pass
try:
result = grade_episode(
repo_path=repo_path,
task_id=task_id,
total_steps=total_steps,
max_steps=max_steps,
reviews=reviews,
reflections=reflections,
required_keywords=required_keywords,
)
return float(result.final_score)
except Exception:
return _SCORE_MIN